[packetary] Introduce driver for debian-repository

Change-Id: Icb8f6964bd0a9080ace50b4786ac76ae5dbb9976
Implements: blueprint refactor-local-mirror-scripts
Partial-Bug: #1487077
This commit is contained in:
Bulat Gaifullin 2015-10-20 18:33:58 +03:00
parent 671af8e611
commit aa570b1f98
15 changed files with 926 additions and 33 deletions

View File

@ -145,17 +145,17 @@ class RepositoryController(object):
with self.context.async_section(0) as section:
for r in repositories:
section.execute(
self._clone_repository,
self._fork_repository,
r, destination, source, locale, mirros
)
return mirros
def _clone_repository(self, r, destination, source, locale, mirrors):
def _fork_repository(self, r, destination, source, locale, mirrors):
"""Creates clone of repository and stores it in mirrors."""
clone = self.driver.clone_repository(
new_repository = self.driver.fork_repository(
self.context.connection, r, destination, source, locale
)
mirrors[r] = clone
mirrors[r] = new_repository
def _copy_package(self, target, package, observer):
"""Synchronises remote file to local fs."""

View File

@ -59,9 +59,9 @@ class RepositoryDriverBase(object):
"""
@abc.abstractmethod
def clone_repository(self, connection, repository, destination,
source=False, locale=False):
"""Creates copy of repository.
def fork_repository(self, connection, repository, destination,
source=False, locale=False):
"""Creates the new repository with same metadata.
:param connection: the connection manager instance
:param repository: the source repository

View File

@ -0,0 +1,373 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from contextlib import closing
import copy
import datetime
import fcntl
import gzip
import os
from debian import deb822
from debian import debfile
from debian.debian_support import Version
import six
from packetary.drivers.base import RepositoryDriverBase
from packetary.library.checksum import composite as checksum_composite
from packetary.library.streams import GzipDecompress
from packetary.library import utils
from packetary.objects import FileChecksum
from packetary.objects import Package
from packetary.objects import PackageRelation
from packetary.objects import Repository
_OPERATORS_MAPPING = {
'>>': 'gt',
'<<': 'lt',
'=': 'eq',
'>=': 'ge',
'<=': 'le',
}
_ARCHITECTURES = {
"x86_64": "amd64",
"i386": "i386",
"source": "Source",
"amd64": "x86_64",
}
_PRIORITIES = {
"required": 1,
"important": 2,
"standard": 3,
"optional": 4,
"extra": 5
}
# Order is important
_REPOSITORY_FILES = [
"Packages",
"Release",
"Packages.gz"
]
# TODO(should be configurable)
_MANDATORY_PRIORITY = 3
_CHECKSUM_METHODS = (
"MD5Sum",
"SHA1",
"SHA256"
)
_checksum_collector = checksum_composite('md5', 'sha1', 'sha256')
class DebRepositoryDriver(RepositoryDriverBase):
def parse_urls(self, urls):
"""Overrides method of superclass."""
for url in urls:
try:
tokens = iter(x for x in url.split(" ") if x)
base, suite = next(tokens), next(tokens)
components = list(tokens)
except StopIteration:
raise ValueError("Invalid url: {0}".format(url))
base = base.rstrip("/")
if base.endswith("/dists"):
base = base[:-6]
# TODO(Flat Repository Format[1])
# [1] https://wiki.debian.org/RepositoryFormat
for component in components:
yield (base, suite, component)
def get_repository(self, connection, url, arch, consumer):
"""Overrides method of superclass."""
base, suite, component = url
release = self._get_url_of_metafile(
(base, suite, component, arch), "Release"
)
deb_release = deb822.Release(connection.open_stream(release))
consumer(Repository(
name=(deb_release["Archive"], deb_release["Component"]),
architecture=arch,
origin=deb_release["origin"],
url=base + "/"
))
def get_packages(self, connection, repository, consumer):
"""Overrides method of superclass."""
index = self._get_url_of_metafile(repository, "Packages.gz")
stream = GzipDecompress(connection.open_stream(index))
self.logger.info("loading packages from %s ...", repository)
pkg_iter = deb822.Packages.iter_paragraphs(stream)
counter = 0
for dpkg in pkg_iter:
try:
consumer(Package(
repository=repository,
name=dpkg["package"],
version=Version(dpkg['version']),
filesize=int(dpkg.get('size', -1)),
filename=dpkg["filename"],
checksum=FileChecksum(
md5=dpkg.get("md5sum"),
sha1=dpkg.get("sha1"),
sha256=dpkg.get("sha256"),
),
mandatory=self._is_mandatory(dpkg),
# Recommends are installed by default (since Lucid)
requires=self._get_relations(
dpkg, "depends", "pre-depends", "recommends"
),
obsoletes=self._get_relations(dpkg, "replaces"),
provides=self._get_relations(dpkg, "provides"),
))
except KeyError as e:
self.logger.error(
"Malformed index %s - %s: %s",
repository, six.text_type(dpkg), six.text_type(e)
)
raise
counter += 1
self.logger.info("loaded: %d packages from %s.", counter, repository)
def rebuild_repository(self, repository, packages):
"""Overrides method of superclass."""
basedir = utils.get_path_from_url(repository.url)
index_file = utils.get_path_from_url(
self._get_url_of_metafile(repository, "Packages")
)
utils.ensure_dir_exist(os.path.dirname(index_file))
index_gz = index_file + ".gz"
count = 0
with open(index_file, "wb") as fd1:
with closing(gzip.open(index_gz, "wb")) as fd2:
writer = utils.composite_writer(fd1, fd2)
for pkg in packages:
filename = os.path.join(basedir, pkg.filename)
with closing(debfile.DebFile(filename)) as deb:
debcontrol = deb.debcontrol()
debcontrol.setdefault("Origin", repository.origin)
debcontrol["Size"] = str(pkg.filesize)
debcontrol["Filename"] = pkg.filename
for k, v in six.moves.zip(_CHECKSUM_METHODS, pkg.checksum):
debcontrol[k] = v
writer(debcontrol.dump())
writer("\n")
count += 1
self.logger.info("saved %d packages in %s", count, repository)
self._update_suite_index(repository)
def fork_repository(self, connection, repository, destination,
source=False, locale=False):
"""Overrides method of superclass."""
# TODO(download gpk)
# TODO(sources and locales)
if not destination.endswith(os.path.sep):
destination += os.path.sep
clone = copy.copy(repository)
clone.url = destination
packages_file = utils.get_path_from_url(
self._get_url_of_metafile(clone, "Packages")
)
release_file = utils.get_path_from_url(
self._get_url_of_metafile(clone, "Release")
)
self.logger.info("clone repository %s to %s", repository, destination)
utils.ensure_dir_exist(os.path.dirname(release_file))
release = deb822.Release()
release["Origin"] = repository.origin
release["Label"] = repository.origin
release["Archive"] = repository.name[0]
release["Component"] = repository.name[1]
release["Architecture"] = _ARCHITECTURES[repository.architecture]
with open(release_file, "wb") as fd:
release.dump(fd)
open(packages_file, "ab").close()
gzip.open(packages_file + ".gz", "ab").close()
return clone
def _update_suite_index(self, repository):
"""Updates the Release file in the suite."""
path = os.path.join(
utils.get_path_from_url(repository.url),
"dists", repository.name[0]
)
release_path = os.path.join(path, "Release")
self.logger.info(
"added repository suite release file: %s", release_path
)
with open(release_path, "a+b") as fd:
fcntl.flock(fd.fileno(), fcntl.LOCK_EX)
try:
fd.seek(0)
release = deb822.Release(fd)
self._add_to_release(release, repository)
for m in _CHECKSUM_METHODS:
release.setdefault(m, [])
self._add_files_to_release(
release, path, self._get_metafiles(repository)
)
fd.truncate(0)
release.dump(fd)
finally:
fcntl.flock(fd.fileno(), fcntl.LOCK_UN)
def _get_relations(self, dpkg, *names):
"""Gets the package relations.
:param dpkg: the debian-package object
:type dpkg: deb822.Packages
:param names: the relation names
:return: the list of PackageRelation objects
"""
relations = list()
for name in names:
for variants in dpkg.relations[name]:
relation = PackageRelation.from_args(
*(self._unparse_relation(v) for v in variants)
)
if relation is not None:
relations.append(relation)
return relations
def _get_metafiles(self, repository):
"""Gets the sequence of metafiles for repository."""
return (
utils.get_path_from_url(
self._get_url_of_metafile(repository, filename)
)
for filename in _REPOSITORY_FILES
)
@staticmethod
def _unparse_relation(relation):
"""Gets the relation parameters.
:param relation: the deb822.Releation object
:return: tuple(name, version_compare, version_edge)
"""
name = relation['name']
version = relation.get("version")
if version is None:
return name, None
else:
return name, _OPERATORS_MAPPING[version[0]], version[1]
@staticmethod
def _is_mandatory(dpkg):
"""Checks that package is mandatory.
:param dpkg: the debian-package object
:type dpkg: deb822.Packages
"""
if dpkg.get("essential") == "yes":
return True
return _PRIORITIES.get(
dpkg.get("priority"), _MANDATORY_PRIORITY + 1
) < _MANDATORY_PRIORITY
@staticmethod
def _get_url_of_metafile(repo_or_comps, filename):
"""Gets the URL of meta-file.
:param repo_or_comps: the repository object or
tuple(baseurl, suite, component, architecture)
:param filename: the name of meta-file
"""
if isinstance(repo_or_comps, Repository):
baseurl = repo_or_comps.url
suite, component = repo_or_comps.name
arch = repo_or_comps.architecture
else:
baseurl, suite, component, arch = repo_or_comps
return "/".join((
baseurl.rstrip("/"), "dists", suite, component,
"binary-" + _ARCHITECTURES[arch],
filename
))
@staticmethod
def _add_to_release(release, repository):
"""Adds repository information to debian release.
:param release: the deb822.Release instance
:param repository: the repository object
"""
# reset the date
release["Date"] = datetime.datetime.now().strftime(
"%a, %d %b %Y %H:%M:%S %Z"
)
release.setdefault("Origin", repository.origin)
release.setdefault("Label", repository.origin)
release.setdefault("Suite", repository.name[0])
release.setdefault("Codename", repository.name[0].split("-", 1)[0])
release.setdefault("Description", "The packages repository.")
keys = ("Architectures", "Components")
values = (repository.architecture, repository.name[1])
for key, value in six.moves.zip(keys, values):
if key in release:
release[key] = utils.append_token_to_string(
release[key],
value
)
else:
release[key] = value
@staticmethod
def _add_files_to_release(release, basepath, files):
"""Adds information about meta files to debian release.
:param release: the deb822.Release instance
:param basepath: the suite folder path
:param files: the sequence of files
"""
files_info = utils.get_size_and_checksum_for_files(
files, _checksum_collector
)
for filepath, size, cs in files_info:
fname = filepath[len(basepath) + 1:]
size = six.text_type(size)
for m, checksum in six.moves.zip(_CHECKSUM_METHODS, cs):
for v in release[m]:
if v["name"] == fname:
v[m] = checksum
v["size"] = size
break
else:
release[m].append(deb822.Deb822Dict({
m: checksum,
"size": size,
"name": fname
}))

View File

@ -24,6 +24,7 @@ import six.moves.urllib_error as urlerror
import time
from packetary.library.streams import StreamWrapper
from packetary.library.utils import ensure_dir_exist
logger = logging.getLogger(__package__)
@ -207,7 +208,7 @@ class ConnectionsManager(object):
except OSError as e:
if e.errno != errno.ENOENT:
raise
self._ensure_dir_exists(filename)
ensure_dir_exist(os.path.dirname(filename))
logger.info("download: %s from the offset: %d", url, offset)
@ -226,16 +227,6 @@ class ConnectionsManager(object):
os.fsync(fd)
os.close(fd)
@staticmethod
def _ensure_dir_exists(dst):
"""Checks that directory exists and creates otherwise."""
target_dir = os.path.dirname(dst)
try:
os.makedirs(target_dir)
except OSError as e:
if e.errno != errno.EEXIST:
raise
def _copy_stream(self, fd, url, offset):
"""Copies remote file to local.

101
packetary/library/utils.py Normal file
View File

@ -0,0 +1,101 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import with_statement
import errno
import os
import six
urlparse = six.moves.urllib.parse.urlparse
def append_token_to_string(tokens, token):
"""Adds new token to space separated list of tokens.
:param tokens: the 'sep' separated list
:param token: new item
"""
values = tokens.split()
if token not in values:
values.append(token)
values.sort()
return ' '.join(values)
return tokens
def composite_writer(*args):
"""Makes helper, that writes into several files simultaneously.
:param args: the list of file objects
:return: the callable object - writer
"""
def write(text):
"""Writes simultaneously to all files with utf-8 encoding control.
:param text: the text, that needs to write
"""
if isinstance(text, six.text_type):
text = text.encode("utf-8")
for arg in args:
arg.write(text)
return write
def get_size_and_checksum_for_files(files, checksum_algo):
"""Gets the path, size and checksum for files.
:param files: the sequence of files
:param checksum_algo: the checksum calculator
:return the sequence of tuples(filename, size, checksum)
"""
for filename in files:
with open(filename, "rb") as fd:
size = os.fstat(fd.fileno()).st_size
checksum = checksum_algo(fd)
yield filename, size, checksum
def get_path_from_url(url):
"""Get the path from the URL.
:param url: the URL
:return: the filepath
:raises ValueError
"""
comps = urlparse(url, scheme="file")
if comps.scheme != "file":
raise ValueError(
"The absolute path is expected, actual have: {0}.".format(url)
)
return comps.path
def ensure_dir_exist(path):
"""Creates directory if it does not exist.
:param path: the full path to directory
"""
try:
os.makedirs(path)
except OSError as e:
if e.errno != errno.EEXIST:
raise

View File

@ -0,0 +1,19 @@
Package: test
Source: test.src
Version: 1.1.1-1~u14.04+test
Architecture: all
Maintainer: Test
Installed-Size: 3509
Homepage: http://localhost/
Priority: required
Section: web
Filename: pool/main/t/test.deb
Size: 100
Depends: test2 (>= 0.8.16~exp9)|tes2-old, test3
Pre-Depends: test-main
Provides: file
Replaces: test-old
SHA256: 14d6e308d8699b7f9ba2fe1ef778c0e38cf295614d308039d687b6b097d50859
SHA1: 402bd18c145ae3b5344edf07f246be159397fd40
MD5sum: 1ae09f80109f40dfbfaf3ba423c8625a
Description: test package

View File

@ -0,0 +1,6 @@
Archive: trusty
Version: 14.04
Component: main
Origin: Ubuntu
Label: Ubuntu
Architecture: amd64

View File

@ -14,7 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from contextlib import closing
import gzip
import mock
import six
class CallbacksAdapter(mock.MagicMock):
@ -37,3 +40,25 @@ class CallbacksAdapter(mock.MagicMock):
callback(d)
else:
callback(data)
class Buffer(object):
"""Helper to hide BytesIO methods."""
def __init__(self, io):
self.io = io
self.reset()
def reset(self):
self.io.seek(0)
def read(self, s=-1):
return self.io.read(s)
def get_compressed(stream):
"""Gets compressed stream."""
compressed = six.BytesIO()
with closing(gzip.GzipFile(fileobj=compressed, mode="wb")) as gz:
gz.write(stream.read())
return Buffer(compressed)

View File

@ -108,10 +108,10 @@ class TestConnectionManager(base.TestCase):
self.assertEqual(1, manager.opener.open.call_count)
@mock.patch("packetary.library.connections.urllib.build_opener")
@mock.patch("packetary.library.connections.ensure_dir_exist")
@mock.patch("packetary.library.connections.os")
def test_retrieve_from_offset(self, os, *_):
manager = connections.ConnectionsManager()
os.path.mkdirs.side_effect = OSError(17, "")
os.stat.return_value = mock.MagicMock(st_size=10)
os.open.return_value = 1
response = mock.MagicMock()
@ -125,10 +125,10 @@ class TestConnectionManager(base.TestCase):
os.close.assert_called_once_with(1)
@mock.patch("packetary.library.connections.urllib.build_opener")
@mock.patch("packetary.library.connections.ensure_dir_exist")
@mock.patch("packetary.library.connections.os")
def test_retrieve_non_existence(self, os, *_):
manager = connections.ConnectionsManager()
os.path.mkdirs.side_effect = OSError(17, "")
os.stat.side_effect = OSError(2, "")
os.open.return_value = 1
response = mock.MagicMock()
@ -141,11 +141,13 @@ class TestConnectionManager(base.TestCase):
os.fsync.assert_called_once_with(1)
os.close.assert_called_once_with(1)
@mock.patch("packetary.library.connections.urllib.build_opener")
@mock.patch("packetary.library.connections.urllib.build_opener",
new=mock.MagicMock())
@mock.patch("packetary.library.connections.ensure_dir_exist",
new=mock.MagicMock())
@mock.patch("packetary.library.connections.os")
def test_retrieve_from_offset_fail(self, os, _, logger):
def test_retrieve_from_offset_fail(self, os, logger):
manager = connections.ConnectionsManager(retries_num=2)
os.path.mkdirs.side_effect = OSError(connections.errno.EACCES, "")
os.stat.return_value = mock.MagicMock(st_size=10)
os.open.return_value = 1
response = mock.MagicMock()

View File

@ -0,0 +1,267 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import os.path as path
import six
from packetary.drivers import deb_driver
from packetary.tests import base
from packetary.tests.stubs.generator import gen_package
from packetary.tests.stubs.generator import gen_repository
from packetary.tests.stubs.helpers import get_compressed
PACKAGES = path.join(path.dirname(__file__), "data", "Packages")
RELEASE = path.join(path.dirname(__file__), "data", "Release")
class TestDebDriver(base.TestCase):
@classmethod
def setUpClass(cls):
super(TestDebDriver, cls).setUpClass()
cls.driver = deb_driver.DebRepositoryDriver()
def setUp(self):
self.connection = mock.MagicMock()
def test_parse_urls(self):
self.assertItemsEqual(
[
("http://host", "trusty", "main"),
("http://host", "trusty", "restricted"),
],
self.driver.parse_urls(
["http://host/dists/ trusty main restricted"]
)
)
self.assertItemsEqual(
[("http://host", "trusty", "main")],
self.driver.parse_urls(
["http://host/dists trusty main"]
)
)
self.assertItemsEqual(
[("http://host", "trusty", "main")],
self.driver.parse_urls(
["http://host/ trusty main"]
)
)
self.assertItemsEqual(
[
("http://host", "trusty", "main"),
("http://host2", "trusty", "main"),
],
self.driver.parse_urls(
[
"http://host/ trusty main",
"http://host2/dists/ trusty main",
]
)
)
def test_get_repository(self):
repos = []
with open(RELEASE, "rb") as stream:
self.connection.open_stream.return_value = stream
self.driver.get_repository(
self.connection,
("http://host", "trusty", "main"),
"x86_64",
repos.append
)
self.connection.open_stream.assert_called_once_with(
"http://host/dists/trusty/main/binary-amd64/Release"
)
self.assertEqual(1, len(repos))
repo = repos[0]
self.assertEqual(("trusty", "main"), repo.name)
self.assertEqual("Ubuntu", repo.origin)
self.assertEqual("x86_64", repo.architecture)
self.assertEqual("http://host/", repo.url)
def test_get_packages(self):
packages = []
repo = gen_repository(name=("trusty", "main"), url="http://host/")
with open(PACKAGES, "rb") as s:
self.connection.open_stream.return_value = get_compressed(s)
self.driver.get_packages(
self.connection,
repo,
packages.append
)
self.connection.open_stream.assert_called_once_with(
"http://host/dists/trusty/main/binary-amd64/Packages.gz",
)
self.assertEqual(1, len(packages))
package = packages[0]
self.assertEqual("test", package.name)
self.assertEqual("1.1.1-1~u14.04+test", package.version)
self.assertEqual(100, package.filesize)
self.assertEqual(
deb_driver.FileChecksum(
'1ae09f80109f40dfbfaf3ba423c8625a',
'402bd18c145ae3b5344edf07f246be159397fd40',
'14d6e308d8699b7f9ba2fe1ef778c0e3'
'8cf295614d308039d687b6b097d50859'),
package.checksum
)
self.assertEqual(
"pool/main/t/test.deb", package.filename
)
self.assertTrue(package.mandatory)
self.assertItemsEqual(
[
'test-main (any)',
'test2 (ge 0.8.16~exp9) | tes2-old (any)',
'test3 (any)'
],
(str(x) for x in package.requires)
)
self.assertItemsEqual(
["file (any)"],
(str(x) for x in package.provides)
)
self.assertItemsEqual(
["test-old (any)"],
(str(x) for x in package.obsoletes)
)
@mock.patch.multiple(
"packetary.drivers.deb_driver",
deb822=mock.DEFAULT,
debfile=mock.DEFAULT,
fcntl=mock.DEFAULT,
gzip=mock.DEFAULT,
utils=mock.DEFAULT,
os=mock.DEFAULT,
open=mock.DEFAULT
)
def test_rebuild_repository(self, os, debfile, deb822, fcntl,
gzip, utils, open):
repo = gen_repository(name=("trusty", "main"), url="file:///repo")
package = gen_package(name="test", repository=repo)
os.path.join = lambda *x: "/".join(x)
utils.get_path_from_url = lambda x: x[7:]
files = [
mock.MagicMock(), # Packages, w
mock.MagicMock(), # Release, a+b
mock.MagicMock(), # Packages, rb
mock.MagicMock(), # Release, rb
mock.MagicMock() # Packages.gz, rb
]
open.side_effect = files
self.driver.rebuild_repository(repo, [package])
open.assert_any_call(
"/repo/dists/trusty/main/binary-amd64/Packages", "wb"
)
gzip.open.assert_called_once_with(
"/repo/dists/trusty/main/binary-amd64/Packages.gz", "wb"
)
debfile.DebFile.assert_called_once_with("/repo/test.pkg")
@mock.patch.multiple(
"packetary.drivers.deb_driver",
deb822=mock.DEFAULT,
gzip=mock.DEFAULT,
open=mock.DEFAULT,
os=mock.DEFAULT,
utils=mock.DEFAULT
)
def test_fork_repository(self, deb822, gzip, open, os, utils):
os.path.sep = "/"
os.path.join = lambda *x: "/".join(x)
utils.get_path_from_url = lambda x: x
repo = gen_repository(name=("trusty", "main"), url="http://localhost")
files = [
mock.MagicMock(),
mock.MagicMock()
]
open.side_effect = files
clone = self.driver.fork_repository(self.connection, repo, "/root")
self.assertEqual(repo.name, clone.name)
self.assertEqual(repo.architecture, clone.architecture)
self.assertEqual(repo.origin, clone.origin)
self.assertEqual("/root/", clone.url)
utils.ensure_dir_exist.assert_called_once_with(os.path.dirname())
open.assert_any_call(
"/root/dists/trusty/main/binary-amd64/Release", "wb"
)
open.assert_any_call(
"/root/dists/trusty/main/binary-amd64/Packages", "ab"
)
gzip.open.assert_called_once_with(
"/root/dists/trusty/main/binary-amd64/Packages.gz", "ab"
)
@mock.patch.multiple(
"packetary.drivers.deb_driver",
fcntl=mock.DEFAULT,
gzip=mock.DEFAULT,
open=mock.DEFAULT,
os=mock.DEFAULT,
utils=mock.DEFAULT
)
def test_update_suite_index(
self, os, fcntl, gzip, open, utils):
repo = gen_repository(name=("trusty", "main"), url="/repo")
files = [
mock.MagicMock(), # Release, a+b
mock.MagicMock(), # Packages, rb
mock.MagicMock(), # Release, rb
mock.MagicMock() # Packages.gz, rb
]
files[0].items.return_value = [
("SHA1", "invalid 1 main/binary-amd64/Packages\n"),
("Architectures", "i386"),
("Components", "restricted"),
]
os.path.join = lambda *x: "/".join(x)
open().__enter__.side_effect = files
utils.get_path_from_url.return_value = "/root"
utils.append_token_to_string.side_effect = [
"amd64 i386", "main restricted"
]
utils.get_size_and_checksum_for_files.return_value = (
(
"/root/dists/trusty/main/binary-amd64/{0}".format(name),
10,
(k + "_value" for k in deb_driver._CHECKSUM_METHODS)
)
for name in deb_driver._REPOSITORY_FILES
)
self.driver._update_suite_index(repo)
open.assert_any_call("/root/dists/trusty/Release", "a+b")
files[0].seek.assert_called_once_with(0)
files[0].truncate.assert_called_once_with(0)
files[0].write.assert_any_call(six.b("Architectures: amd64 i386\n"))
files[0].write.assert_any_call(six.b("Components: main restricted\n"))
for k in deb_driver._CHECKSUM_METHODS:
files[0].write.assert_any_call(six.b(
'{0}:\n'
' {1} 10 main/binary-amd64/Packages\n'
' {1} 10 main/binary-amd64/Release\n'
' {1} 10 main/binary-amd64/Packages.gz\n'
.format(k, k + "_value")
))
open.assert_any_call("/root/dists/trusty/Release", "a+b")
print([x.fileno() for x in files])
fcntl.flock.assert_any_call(files[0].fileno(), fcntl.LOCK_EX)
fcntl.flock.assert_any_call(files[0].fileno(), fcntl.LOCK_UN)

View File

@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from packetary.library import utils
from packetary.tests import base
class TestLibraryUtils(base.TestCase):
def test_append_token_to_string(self):
self.assertEqual(
"v1 v2 v3",
utils.append_token_to_string("v2 v3", "v1")
)
self.assertEqual(
"v1",
utils.append_token_to_string("", "v1")
)
self.assertEqual(
"v1 v2 v3 v4",
utils.append_token_to_string('v1\tv2 v3', "v4")
)
self.assertEqual(
"v1 v2 v3",
utils.append_token_to_string('v1 v2 v3', "v1")
)
def test_composite_writer(self):
fds = [
mock.MagicMock(),
mock.MagicMock()
]
writer = utils.composite_writer(*fds)
writer(u"text1")
writer(b"text2")
for fd in fds:
fd.write.assert_any_call(b"text1")
fd.write.assert_any_call(b"text2")
@mock.patch.multiple(
"packetary.library.utils",
os=mock.DEFAULT,
open=mock.DEFAULT
)
def test_get_size_and_checksum_for_files(self, os, open):
files = [
"/file1.txt",
"/file2.txt"
]
os.fstat.side_effect = [
mock.MagicMock(st_size=1),
mock.MagicMock(st_size=2)
]
r = list(utils.get_size_and_checksum_for_files(
files, mock.MagicMock(side_effect=["1", "2"])
))
self.assertEqual(
[("/file1.txt", 1, "1"), ("/file2.txt", 2, "2")],
r
)
def test_get_path_from_url(self):
self.assertEqual(
"/a/f.txt",
utils.get_path_from_url("/a/f.txt")
)
self.assertEqual(
"/a/f.txt",
utils.get_path_from_url("file:///a/f.txt?size=1")
)
with self.assertRaises(ValueError):
utils.get_path_from_url("http:///a/f.txt")
@mock.patch("packetary.library.utils.os")
def test_ensure_dir_exist(self, os):
os.makedirs.side_effect = [
True,
OSError(utils.errno.EEXIST, ""),
OSError(utils.errno.EACCES, ""),
ValueError()
]
utils.ensure_dir_exist("/nonexisted")
os.makedirs.assert_called_with("/nonexisted")
utils.ensure_dir_exist("/existed")
os.makedirs.assert_called_with("/existed")
with self.assertRaises(OSError):
utils.ensure_dir_exist("/private")
with self.assertRaises(ValueError):
utils.ensure_dir_exist(1)

View File

@ -134,10 +134,10 @@ class TestRepositoryController(base.TestCase):
gen_repository(name="test2")
]
clones = [copy.copy(x) for x in repos]
self.driver.clone_repository.side_effect = clones
self.driver.fork_repository.side_effect = clones
mirrors = self.ctrl.clone_repositories(repos, "./repo")
for r in repos:
self.driver.clone_repository.assert_any_call(
self.driver.fork_repository.assert_any_call(
self.context.connection, r, "/root/repo", False, False
)
self.assertEqual(mirrors, dict(zip(repos, clones)))

View File

@ -14,11 +14,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import gzip
import six
from packetary.library import streams
from packetary.tests import base
from packetary.tests.stubs.helpers import get_compressed
class TestBufferedStream(base.TestCase):
@ -57,16 +57,12 @@ class TestBufferedStream(base.TestCase):
class TestGzipDecompress(base.TestCase):
@classmethod
def setUpClass(cls):
cls.gzipped = six.BytesIO()
gz = gzip.GzipFile(fileobj=cls.gzipped, mode="w")
gz.write(b"line1\nline2\nline3\n")
gz.flush()
gz.close()
cls.compressed = get_compressed(six.BytesIO(b"line1\nline2\nline3\n"))
def setUp(self):
super(TestGzipDecompress, self).setUp()
self.gzipped.seek(0)
self.stream = streams.GzipDecompress(self.gzipped)
self.compressed.reset()
self.stream = streams.GzipDecompress(self.compressed)
def test_read(self):
chunk = self.stream.read(5)

View File

@ -9,3 +9,4 @@ bintrees>=2.0.2
chardet>=2.3.0
stevedore>=1.1.0
six>=1.5.2
python-debian>=0.1.23

View File

@ -25,6 +25,13 @@ classifier =
packages =
packetary
[entry_points]
console_scripts =
packetary=packetary.cli.app:main
packetary.drivers =
deb=packetary.drivers.deb_driver:DebRepositoryDriver
[build_sphinx]
source-dir = doc/source
build-dir = doc/build