root/build-tools/stx/repo_manage.py
Scott Little ef7fa75417 Allow our repositories to have multiple versions of a package
The original code would download all verions names in our lst files,
upload them one at a time to Aptly, and as part of each upload it
would delete other versions of the package.  The end result is that
the last uploaded version is the only one remaining in Aptly.

Fix is to delete old versions of a package from Aptly as a
separate step, with full knowledge of all versions we need to keep.

Testing:

1) Add multiple versions of a package to a lst file.
Run downloader.  All expected versions are found
in Aptly.

2) Remove the extra version of the package.
Run downloader.  The extra, no longer needed package
version are removed from Aptly leaving only the one we want.

Story: 2010797
Task: 48697

Change-Id: I6de33d8d1c2bb5161c905765e7757102ea9b8cac
Signed-off-by: Scott Little <scott.little@windriver.com>
(cherry picked from commit 9b0a139466)
2023-10-05 00:41:20 +00:00

1027 lines
47 KiB
Python
Executable File

#!/usr/bin/python3
# codeing = utf-8
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Copyright (C) 2021 WindRiver Corporation
import apt
import apt_pkg
import aptly_deb_usage
import argparse
from concurrent.futures import as_completed
from concurrent.futures import ThreadPoolExecutor
import debian.deb822
import debian.debfile
import logging
import os
import requests
from requests.compat import urljoin
import shutil
from threading import Lock
import urllib.request
import utils
REPOMGR_URL = os.environ.get('REPOMGR_URL')
REPOMGR_ORIGIN = os.environ.get('REPOMGR_ORIGIN')
REPOMGR_DEPLOY_URL = os.environ.get('REPOMGR_DEPLOY_URL')
APTFETCH_JOBS = 20
class AptFetch():
'''
Fetch Debian packages from a set of repositories.
'''
def __init__(self, logger, sources_list='', workdir='/tmp/apt-fetch'):
self.logger = logger
self.aptcache = None
self.aptlock = Lock()
self.workdir = workdir
self.sources_list = sources_list
self.__construct_workdir(sources_list)
self.__init_apt_cache()
def __construct_workdir(self, sources_list):
'''construct some directories for repo and temporary files'''
# In case the sources_list is specified:
#
# ├── apt-root # For apt cache
# │ └── etc
# │ └── apt
# │ └── sources.list
# └── downloads # Sub directory to store downloaded packages
# ├── binary
# └── source
#
# In case the sources_list is ''
# |
# └── downloads # Sub directory to store downloaded packages
# ├── binary
# └── source
basedir = self.workdir
try:
if os.path.exists(basedir):
shutil.rmtree(basedir)
os.makedirs(basedir)
if self.sources_list:
# check to see if meta file exist
if not os.path.exists(sources_list):
raise Exception('Specified sources list file %s does not exist' % sources_list)
aptdir = os.path.join(basedir, 'apt-root/etc/apt')
os.makedirs(aptdir)
shutil.copyfile(sources_list, os.path.join(aptdir, 'sources.list'))
os.makedirs(os.path.join(basedir, 'downloads', 'binary'))
os.makedirs(os.path.join(basedir, 'downloads', 'source'))
except Exception as e:
self.logger.error(str(e))
self.logger.error('Failed to construct workdir %s' % basedir)
raise Exception('Failed to construct workdir %s' % basedir)
def __init_apt_cache(self):
'''Construct APT cache based on specified rootpath. Just like `apt update` on host'''
# In case we use host's apt settings, no need to apt-upgrade.
if not self.sources_list:
self.aptcache = apt.Cache()
return None
try:
self.aptcache = apt.Cache(rootdir=os.path.join(self.workdir, 'apt-root'))
ret = self.aptcache.update()
if not ret:
raise Exception('APT cache update failed')
self.aptcache.open()
except Exception as e:
self.logger.error(str(e))
self.logger.error('apt cache init failed.')
raise Exception('apt cache init failed.')
# Download a binary package into downloaded folder
def fetch_deb(self, pkg_name, pkg_version):
'''Download a binary package'''
ret = ''
if not pkg_name or not pkg_version:
return ret
self.logger.info("Current downloading:%s:%s", pkg_name, pkg_version)
destdir = os.path.join(self.workdir, 'downloads', 'binary')
self.aptlock.acquire()
try:
pkg = self.aptcache[pkg_name]
if not pkg:
self.aptlock.release()
self.logger.error("Failed to find binary package %s", pkg_name)
return ret
default_candidate = pkg.candidate
self.logger.debug("The default candidate is %s", default_candidate.version)
candidate = pkg.versions.get(pkg_version)
if not candidate:
if ':' in default_candidate.version:
epoch, ver = default_candidate.version.split(':')
if epoch.isdigit() and ver == pkg_version:
self.logger.debug('epoch %s will be skipped for %s_%s', epoch, pkg_name, ver)
candidate = default_candidate
if not candidate:
self.aptlock.release()
self.logger.error("Failed to found the matched version %s for %s", pkg_version, pkg_name)
return ret
except Exception as e:
self.aptlock.release()
self.logger.error("Exception during candidate searching:%s", str(e))
return ret
uri = candidate.uri
filename = candidate.filename
self.aptlock.release()
try:
res = requests.get(uri, stream=True)
self.logger.debug('Fetching package file %s' % uri)
with open(os.path.join(destdir, os.path.basename(filename)), 'wb') as download_file:
for chunk in res.iter_content(chunk_size=1024 * 1024):
if chunk:
download_file.write(chunk)
except Exception as e:
self.logger.error(str(e))
self.logger.error('Binary package %s %s download error' % (pkg_name, pkg_version))
return ' '.join(['DEB-F', pkg_name, pkg_version]).strip()
else:
self.logger.debug('Binary package %s %s downloaded.' % (pkg_name, pkg_version))
return ' '.join(['DEB', pkg_name, pkg_version]).strip()
# Download a source package into downloaded folder
def fetch_dsc(self, pkg_name, pkg_version=''):
'''Download a source package'''
if not pkg_name:
raise Exception('Source package name empty')
destdir = os.path.join(self.workdir, 'downloads', 'source')
self.aptlock.acquire()
src = apt_pkg.SourceRecords()
source_find = False
source_lookup = src.lookup(pkg_name)
while source_lookup:
if pkg_name == src.package and pkg_version in ['', src.version]:
source_find = True
break
source_lookup = src.lookup(pkg_name)
if not source_find:
self.aptlock.release()
raise ValueError("Source package %s %s was not found" % (pkg_name, pkg_version))
dict_files = dict()
for src_file in src.files:
dict_files[src_file.path] = src.index.archive_uri(src_file.path)
self.aptlock.release()
# Here the src.files is a list, each one points to a source file
# Download those source files one by one with requests
try:
for file_path, uri in dict_files.items():
res = requests.get(uri, stream=True)
self.logger.debug('Fetch package file %s' % uri)
with open(os.path.join(destdir, os.path.basename(file_path)), 'wb') as download_file:
for chunk in res.iter_content(chunk_size=1024 * 1024):
if chunk:
download_file.write(chunk)
except Exception as e:
self.logger.error(str(e))
self.logger.error('Source package %s %s download error' % (pkg_name, pkg_version))
return ' '.join(['DSC-F', pkg_name, pkg_version]).strip()
else:
self.logger.debug('Source package %s %s downloaded.' % (pkg_name, pkg_version))
return ' '.join(['DSC', pkg_name, pkg_version]).strip()
# Download a bundle of packages into downloaded folder
# deb_list: binary package list file
# dsc_list: source package list file
def fetch_pkg_list(self, deb_set=set(), dsc_set=set()):
'''Download a bundle of packages specified through deb_list and dsc_list.'''
if not deb_set and not dsc_set:
raise Exception('deb_list and dsc_list, at least one is required.')
fetch_result = dict()
fetch_result['deb'] = list()
fetch_result['deb-failed'] = list()
fetch_result['dsc'] = list()
fetch_result['dsc-failed'] = list()
with ThreadPoolExecutor(max_workers=APTFETCH_JOBS) as threads:
obj_list = []
# Download binary packages
for pkg_ver in deb_set:
pkg_name = pkg_ver.split()[0]
if len(pkg_ver.split()) == 1:
pkg_version = ''
else:
pkg_version = pkg_ver.split()[1]
obj = threads.submit(self.fetch_deb, pkg_name, pkg_version)
if not obj:
self.logger.error("Failed to download %s:%s", pkg_name, pkg_version)
else:
obj_list.append(obj)
# Download source packages
for pkg_ver in dsc_set:
pkg_name = pkg_ver.split()[0]
if len(pkg_ver.split()) == 1:
pkg_version = ''
else:
pkg_version = pkg_ver.split()[1]
obj = threads.submit(self.fetch_dsc, pkg_name, pkg_version)
obj_list.append(obj)
# Wait...
for future in as_completed(obj_list):
ret = future.result()
if ret:
if ret.startswith('DEB'):
fetch_result['deb'].append(ret.lstrip('DEB '))
elif ret.startswith('DEB-F'):
fetch_result['deb-failed'].append(ret.lstrip('DEB-F '))
elif ret.startswith('DSC'):
fetch_result['dsc'].append(ret.lstrip('DSC '))
elif ret.startswith('DSC-F'):
fetch_result['dsc-failed'].append(ret.lstrip('DSC-F '))
return fetch_result
class RepoMgr():
'''
Repository management, based on pulp or aptly, mainly used for OBS and LAT
Two kind of repositories: local repo and remote one.
remote repo: mirror of another repository. shouldn't insert or remove packages from it..
local repo: a local repository, we can insert/remove packages into/from them.
'''
def __init__(self, repoType, repoURL, workdir, origin, logger):
if repoType == 'aptly':
self.repo = aptly_deb_usage.Deb_aptly(repoURL, origin, logger)
else:
raise Exception('Currently, only aptly repository supported')
self.aptcache = None
self.logger = logger
self.workdir = workdir
def __sync_pkg_list(self, check_list, pkg_list_file, pkg_type='binary'):
'''Sync packages, return packages need to be downloaded'''
if pkg_type not in ['binary', 'source']:
self.logger.error('Parameter pkg_tye:%s error. Can only be binary or source' % pkg_type)
raise Exception('Parameter pkg_tye:%s error. Can only be binary or source' % pkg_type)
pkg_req = set()
# scan the package list file
pkg_set = self.__scan_pkg_list(pkg_list_file)
for pkg_ver in pkg_set:
pkg_name = pkg_ver.split()[0]
if len(pkg_ver.split()) == 1:
pkg_version = ''
else:
pkg_version = pkg_ver.split()[1]
# Search package in check_list
if self.repo.pkg_exist(check_list, pkg_name, pkg_type, pkg_version):
continue
pkg_req.add(pkg_ver)
self.logger.info('%d %s packages need to be downloaded.' % (len(pkg_req), pkg_type))
return pkg_req
# Scan package list file, return required package_version in a set
def __scan_pkg_list(self, pkg_list_file):
pkg_set = set()
try:
with open(pkg_list_file, 'r') as f_list:
lines = f_list.readlines()
except Exception as e:
self.logger.error(str(e))
self.logger.error('Package list file %s read error.' % pkg_list_file)
raise Exception('Package list file %s read error.' % pkg_list_file)
else:
for line in lines:
pkg_info = line.split('#')[0].split()
if not pkg_info:
continue
if len(pkg_info) == 1:
pkg_set.add(pkg_info[0])
else:
pkg_set.add(pkg_info[0] + ' ' + pkg_info[1])
# If xxx_1.2.3 exist, remove xxx.
tmp_set = pkg_set.copy()
for pkg_ver in tmp_set:
if pkg_ver.split()[0] in pkg_set:
pkg_set.remove(pkg_ver.split()[0])
return pkg_set
# Download a bundle of packages and deployed them through repository
# repo_name: the repository used to deploy these packages
# no_clear: Not delete downloaded package files, for debug
# kwarge:sources_list: Where we should fetch packages from.
# kwarge:deb_list: file contains binary package list
# kwarge:dsc_ist: file contains source package list
# Output: None
def download(self, repo_name, no_clear=False, **kwargs):
'''Download specified packages and deploy them through a specified local repo.'''
if 'deb_list' not in kwargs.keys():
deb_list_file = ''
else:
deb_list_file = kwargs['deb_list']
if 'dsc_list' not in kwargs.keys():
dsc_list_file = ''
else:
dsc_list_file = kwargs['dsc_list']
if not deb_list_file and not dsc_list_file:
raise Exception('deb_list and dsc_list, at least one is required.')
if not repo_name == '':
# No matter if any packages can be download, create related repository firstly.
if not self.repo.create_local(repo_name):
raise Exception('Local repo created failed, Please double check if it already exists.')
# Scan deb/dsc_list_file, get required packages
deb_set = set()
dsc_set = set()
if deb_list_file:
deb_set = self.__scan_pkg_list(deb_list_file)
self.logger.info('%d binary packages will be downloaded.' % len(deb_set))
if dsc_list_file:
dsc_set = self.__scan_pkg_list(dsc_list_file)
self.logger.info('%d source packages will be downloaded.' % len(dsc_set))
# Download packages
apt_fetch = AptFetch(self.logger, kwargs['sources_list'], self.workdir)
fetch_result = apt_fetch.fetch_pkg_list(deb_set=deb_set, dsc_set=dsc_set)
if fetch_result['deb'] or fetch_result['deb-failed']:
self.logger.info('Download %d binary packages' % len(fetch_result['deb']))
self.logger.info('%d binary packages download failed' % len(fetch_result['deb-failed']))
for pkg_ver in fetch_result['deb']:
self.logger.debug('Binary package %s' % pkg_ver)
for pkg_ver in fetch_result['deb-failed']:
self.logger.info('Failed to download binary package %s' % pkg_ver)
if fetch_result['dsc'] or fetch_result['dsc-failed']:
self.logger.info('Download %d source packages' % len(fetch_result['dsc']))
self.logger.info('%d source packages download failed' % len(fetch_result['dsc-failed']))
for pkg_ver in fetch_result['dsc']:
self.logger.debug('Source package %s' % pkg_ver)
for pkg_ver in fetch_result['dsc-failed']:
self.logger.info('Failed to download source package %s' % pkg_ver)
if repo_name == '':
self.logger.info('All packages are downloaded to %s', self.workdir)
return
# Add packages into local repo
pkg_folder = os.path.join(self.workdir, 'downloads', 'binary')
package_files = set()
for filename in os.listdir(pkg_folder):
package_files.add(os.path.join(pkg_folder, filename))
if package_files:
self.repo.upload_pkg_local(package_files, repo_name)
pkg_folder = os.path.join(self.workdir, 'downloads', 'source')
for filename in os.listdir(pkg_folder):
if filename.endswith('.dsc'):
self.upload_pkg(repo_name, os.path.join(pkg_folder, filename))
# Deploy local repo
repo_str = self.repo.deploy_local(repo_name)
if not no_clear:
shutil.rmtree(self.workdir)
self.logger.info('New local repo can be accessed through: %s' % repo_str)
# We need a bundle packages been deployed through a serials of repositories
# We have:
# -) a serials of atply/pulp repositories already contains some packages;
# -) an aptly/pulp repository can be used to deploy downloaded packages
# -) a serials of upstream Debian repository
# -) Two text files list binary and source packages we need
#
# SYNC will download all packages we haven't and deployed them through
# a local repository.
# reponame: name of a local repository used to deploy downloaded packages
# repo_list: String separated with space, contains serials of aptly/pulp
# repos can be used for OBS/LAT.
# no_clear: do not delete downloaded packages. For debug
# kwargs:sources_list: file contains trusted upstream repositories
# kwargs:deb_list: file lists all needed binary packages
# kwargs:dsc_list: file lists all needed source packages
# Output: None
def sync(self, repo_name, repo_list, no_clear=False, **kwargs):
'''
Sync a set of repositories with specified package lists, if any packages
missed, download and deploy them through a specified local repo
'''
if 'deb_list' not in kwargs.keys():
deb_list = ''
else:
deb_list = kwargs['deb_list']
if 'dsc_list' not in kwargs.keys():
dsc_list = ''
else:
dsc_list = kwargs['dsc_list']
if not deb_list and not dsc_list:
raise Exception('deb_list and dsc_list, at least one is required.')
# construct repo list will be checked
local_list = self.repo.list_local(quiet=True)
remote_list = self.repo.list_remotes(quiet=True)
# Specified local repo must exist, or failed
if repo_name not in local_list:
self.logger.info('Sync, local repo %s does not exist, create it.' % repo_name)
if not self.repo.create_local(repo_name):
raise Exception('Local repo %s created failed.' % repo_name)
# Make sure all repos in check_list exist in aptly/pulp database.
check_list = [repo_name]
for repo in repo_list.split():
if repo in local_list or repo in remote_list:
if repo not in check_list:
check_list.append(repo)
else:
self.logger.warning('%s in the list but does not exists.' % repo)
# Download missing packages from remote repo
deb_set = set()
dsc_set = set()
if deb_list:
deb_set = self.__sync_pkg_list(check_list, deb_list, 'binary')
if dsc_list:
dsc_set = self.__sync_pkg_list(check_list, dsc_list, 'source')
# Download packages
if deb_set or dsc_set:
apt_fetch = AptFetch(self.logger, kwargs['sources_list'], self.workdir)
fetch_result = apt_fetch.fetch_pkg_list(deb_set=deb_set, dsc_set=dsc_set)
if fetch_result['deb'] or fetch_result['deb-failed']:
self.logger.info('Download %d binary packages' % len(fetch_result['deb']))
self.logger.info('%d binary packages download failed' % len(fetch_result['deb-failed']))
for pkg_ver in fetch_result['deb']:
self.logger.debug('Binary package %s' % pkg_ver)
for pkg_ver in fetch_result['deb-failed']:
self.logger.info('Failed to download binary package %s' % pkg_ver)
if fetch_result['dsc'] or fetch_result['dsc-failed']:
self.logger.info('Download %d source packages' % len(fetch_result['dsc']))
self.logger.info('%d source packages download failed' % len(fetch_result['dsc-failed']))
for pkg_ver in fetch_result['dsc']:
self.logger.debug('Source package %s' % pkg_ver)
for pkg_ver in fetch_result['dsc-failed']:
self.logger.info('Failed to download source package %s' % pkg_ver)
# Add packages into local repo
pkg_folder = os.path.join(self.workdir, 'downloads', 'binary')
package_files = set()
for filename in os.listdir(pkg_folder):
package_files.add(os.path.join(pkg_folder, filename))
self.repo.upload_pkg_local(package_files, repo_name)
pkg_folder = os.path.join(self.workdir, 'downloads', 'source')
for filename in os.listdir(pkg_folder):
if filename.endswith('.dsc'):
self.upload_pkg(repo_name, os.path.join(pkg_folder, filename))
self.repo.upload_pkg_local(package_files, repo_name)
# Deploy local repo
repo_str = self.repo.deploy_local(repo_name)
if not no_clear:
try:
if os.path.exists(self.workdir):
shutil.rmtree(self.workdir)
except Exception as e:
self.logger.error(str(e))
self.logger.error('Clear work folder %s failed.' % self.workdir)
raise Exception('Clear work folder %s failed.' % self.workdir)
self.logger.info('local repo can be accessed through: %s ' % repo_str)
# Merge all packages of several repositories into a new publication(aptly)
# NOTE: aptly only. Not find similar feature in pulp...
def merge(self, name, source_snapshots):
'''Merge several repositories into a new aptly publication.'''
return self.repo.merge_repos(name, source_snapshots.split(','))
# Construct a repository mirror to an upstream Debian repository
# kwargs:url: URL of the upstream repo (http://deb.debian.org/debian)
# kwargs:distribution: the distribution of the repo (bullseye)
# kwargs:component: component of the repo (main)
# kwargs:architecture: architecture of the repo, "all" is always enabled. (amd64)
# kwargs:with_sources: include source packages, default is False.
# Output: None
def mirror(self, repo_name, **kwargs):
'''Construct a mirror based on a debian repository.'''
url = kwargs['url']
distribution = kwargs['distribution']
component = kwargs['component']
architectures = kwargs['architectures']
if 'with_sources' not in kwargs.keys():
with_sources = False
else:
with_sources = kwargs['with_sources']
self.repo.remove_remote(repo_name)
if with_sources:
self.repo.create_remote(repo_name, url, distribution,
components=[component],
architectures=[architectures],
with_sources=True)
else:
self.repo.create_remote(repo_name, url, distribution,
components=[component],
architectures=[architectures])
repo_str = self.repo.deploy_remote(repo_name)
if repo_str != None:
self.logger.info('New mirror can be accessed through: %s' % repo_str)
return True
else:
self.logger.error('Create mirror failed %s : %s' % (repo_name, url))
return False
# List all repositories
# Output: None
def list(self):
'''List all repos.'''
self.repo.list_remotes()
self.repo.list_local()
# Clean all packages and repositories
def clean(self):
'''Clear all meta files. Construct a clean environment'''
self.repo.clean_all()
# Copy specified packages from one repository to another
def copy_pkgs(self, source, dest, pkg_names, pkg_type='binary', deploy=True, overwrite=True):
'''Copy specified packages from source repository to destination repository.'''
if pkg_type != 'binary' and pkg_type != 'source':
self.logger.error('The package type must be binary or source')
return False
if source == dest:
self.logger.error('%s and %s are the same repository.' % (source, dest))
return False
if pkg_type != 'binary' and pkg_type != 'source':
self.logger.error('The package type must be binary or source')
return False
pkg_list = list()
for pkg in pkg_names.split(','):
pkg_list.append(pkg.strip())
ret = self.repo.copy_pkgs(source, dest, pkg_list, pkg_type)
if not ret:
self.logger.error('Copy %s packages from %s to %s failed' % (pkg_type, source, dest))
return False
elif deploy:
self.repo.deploy_local(dest)
return True
# list a repository
# repo_name: the name of the repo been listed.
# Output: True is all works in order
def list_pkgs(self, repo_name, quiet=False):
'''List a specified repository.'''
local_list = self.repo.list_local(quiet=True)
remote_list = self.repo.list_remotes(quiet=True)
pkg_list = []
for repo in local_list:
if repo == repo_name:
self.logger.info('List a local repo')
pkgs = self.repo.pkg_list([repo])
pkg_list.extend(pkgs)
if not quiet:
self.logger.info("Local repo %s:" % repo_name)
for pkg in sorted(pkgs):
self.logger.info(" %s" % pkg)
for repo in remote_list:
if repo == repo_name:
self.logger.info('List a remote mirror')
pkgs = self.repo.pkg_list([repo])
pkg_list.extend(pkgs)
if not quiet:
self.logger.info("Remote repo %s:" % repo_name)
for pkg in sorted(pkgs):
self.logger.info(" %s" % pkg)
return pkg_list
# delete a repository
# repo_name: the name of the repo been deleted.
# Output: True is all works in order
def remove_repo(self, repo_name):
'''Remove a specified repository.'''
local_list = self.repo.list_local(quiet=True)
remote_list = self.repo.list_remotes(quiet=True)
for repo in local_list:
if repo == repo_name:
self.logger.info('Remove a local repo')
self.repo.remove_local(repo)
return True
for repo in remote_list:
if repo == repo_name:
self.logger.info('Remove a remote mirror')
self.repo.remove_remote(repo)
return True
self.logger.warning("Remove repo failed: repo '%s' not found" % repo_name)
return False
# Before uploading a source package into a local repo, scan all repos,
# find all duplicate files with different size.
# dsc: Dsc data of the source package. <class 'debian.deb822.Dsc'>
# Return a dictionary: {repo_1: {file_a, ...}, ...}
# Input dsc contains file_a, while repo_1 also contains such a file
# with different size.
def __check_orig_files(self, dsc):
different_files = {}
repo_list = self.repo.list_local(quiet=True)
repo_list += self.repo.list_remotes(quiet=True)
for repo in repo_list:
for meta_file in dsc['Files']:
if meta_file['name'].find('.orig.'):
new_file_size = meta_file['size']
if dsc['Source'].startswith('lib'):
prefix_dir = dsc['Source'][:4] + '/' + dsc['Source']
else:
prefix_dir = dsc['Source'][0] + '/' + dsc['Source']
target_path = repo + '/pool/main/' + prefix_dir + '/' + meta_file['name']
target_url = urljoin(REPOMGR_DEPLOY_URL, target_path)
try:
orig_file = urllib.request.urlopen(target_url)
except Exception:
# no such file in repo, that is good
self.logger.debug('%s does not contain %s' % (repo, meta_file['name']))
continue
if orig_file.length != int(new_file_size):
self.logger.debug('File %s is not same as the one in %s.' %
(meta_file['name'], repo))
if repo not in different_files.keys():
different_files[repo] = {meta_file['name']}
else:
different_files[repo].add(meta_file['name'])
return different_files
# upload a Debian package and deploy it
# repo_name: the name of the repository used to contain and deploy the package
# package: pathname of the package(xxx.deb or xxx.dsc) to be uploaded
# deploy: If True, deploy the repository after "binary" package been uploaded.
# Output: True if all works.
def upload_pkg(self, repo_name, package, deploy=True):
'''Upload a Debian package into a specified repository.'''
self.logger.info("upload_pkg: %s to %s", package, repo_name)
local_list = self.repo.list_local(quiet=True)
if repo_name not in local_list:
self.logger.info('upload_pkg: repository %s does not exist, creating it.' % repo_name)
self.repo.create_local(repo_name)
if not package and deploy:
# No repo found, no package specified, just create & deploy the repo and return
self.repo.deploy_local(repo_name)
return True
self.logger.debug('upload_pkg: upload package %s into %s' % (package, repo_name))
if '.deb' == os.path.splitext(package)[-1]:
pkg_type = 'binary'
try:
deb = debian.debfile.DebFile(package, 'r').debcontrol()
except Exception as e:
self.logger.error('Error: %s' % e)
self.logger.error('Binary package %s read error.' % package)
raise Exception('Binary package error.')
pkg_version = deb['Version']
pkg_name = deb['Package']
self.repo.upload_pkg_local({package}, repo_name)
elif '.dsc' == os.path.splitext(package)[-1]:
pkg_type = 'source'
try:
dsc = debian.deb822.Dsc(open(package, 'r'))
except Exception as e:
self.logger.error('Error: %s' % e)
self.logger.error('Source package %s read error.' % package)
raise Exception('Source package error.')
pkg_name = dsc['Source']
pkg_version = dsc['Version']
# In case there is already an *.orig.* file with different size in any repos,
# refuse to upload it.
different_files = self.__check_orig_files(dsc)
if different_files:
for repo, meta_files in different_files.items():
self.logger.error('%s contains different file: %s' % (repo, str(meta_files)))
self.logger.error('Package %s upload failed. Repo %s already contains '
'file %s with different content.' %
(package, repo, str(meta_files)))
return False
pkg_files = set()
pkg_files.add(package)
for meta_file in dsc['Files']:
pkg_files.add(os.path.join(os.path.dirname(package), meta_file['name']))
self.repo.upload_pkg_local(pkg_files, repo_name)
else:
self.logger.warning('Only Debian style files, like deb and dsc, are supported.')
return False
self.logger.debug('upload_pkg: package %s been uploaded into %s' % (package, repo_name))
# deploy = False only effect on binary package.
if 'binary' == pkg_type and not deploy:
return True
self.repo.deploy_local(repo_name)
# Double check if the package been uploaded successfully
if not self.search_pkg(repo_name, pkg_name, pkg_version, pkg_type == 'binary'):
self.logger.error('upload_pkg: verify failed, no %s package %s %s in %s'
% (pkg_type, pkg_name, pkg_version, repo_name))
return False
return True
# search a package from a repository
# repo_name: name of the repository to search the package in
# pkg_name: name of the Debian package to be searched
# pkg_version: version number of the package to be searched
# binary: binary package or source one?
# Output: True if find, or False
def search_pkg(self, repo_name, pkg_name, pkg_version=None, binary=True):
'''Find a package from a specified repo.'''
repo_find = False
repo = None
r_list = self.repo.list_local(quiet=True)
for repo in r_list:
if repo == repo_name:
repo_find = True
r_list = self.repo.list_remotes(quiet=True)
for repo in r_list:
if repo == repo_name:
repo_find = True
if not repo_find:
self.logger.error('Search package, repository does not exist.')
return False
if binary:
pkg_type = 'binary'
else:
pkg_type = 'source'
if not self.repo.pkg_exist([repo_name], pkg_name, pkg_type, pkg_version):
self.logger.info('Search %s package %s, not found.' % (pkg_type, pkg_name))
return False
return True
# Deploy a local repository
def deploy_repo(self, repo_name, suffix=''):
'''Deploy a local repository manually.'''
local_list = self.repo.list_local(quiet=True)
if repo_name not in local_list:
self.logger.error('Local repository deploy failed, %s does not exist.' % repo_name)
return
return self.repo.deploy_local(repo_name, suffix)
# Delete a Debian package from a local repository
# repo_name: name of the LOCAL repository to delete the package from
# pkg_name: name of the binary package to be deleted
# pkg_type: 'source' or 'binary'
# pkg_version: version number of the package to be deleted
# Output: True if find and delete, or False
def delete_pkg(self, repo_name, pkg_name, pkg_type, pkg_version='', deploy=True):
'''Find and delete a binary package from a specified local repo.'''
repo_find = False
repo = None
if pkg_type not in {'binary', 'source'}:
self.logger.error('Delete package, pkg_type must be one of '
'either "binary" or "source".')
return False
if not repo_name.startswith(aptly_deb_usage.PREFIX_LOCAL):
self.logger.error('Delete package, only local repositories support this operation.')
return False
local_list = self.repo.list_local(quiet=True)
for repo in local_list:
if repo == repo_name:
repo_find = True
if not repo_find:
self.logger.error('Delete package, repository does not exist.')
return False
if not self.repo.pkg_exist([repo_name], pkg_name, pkg_type, pkg_version):
self.logger.info('Delete package, package not found.')
return False
self.repo.delete_pkg_local(repo_name, pkg_name, pkg_type, pkg_version)
# deploy = False only effect on binary packages
if 'binary' == pkg_type and not deploy:
return True
self.repo.deploy_local(repo_name)
return True
# Simple example on using this class.
applogger = logging.getLogger('repomgr')
utils.set_logger(applogger)
def _handleDownload(args):
repomgr = RepoMgr('aptly', REPOMGR_URL, args.basedir, REPOMGR_ORIGIN, applogger)
kwargs = {'sources_list': args.sources_list, 'deb_list': args.deb_list,
'dsc_list': args.dsc_list}
repomgr.download(args.repository, **kwargs, no_clear=args.no_clear)
def _handleSync(args):
repomgr = RepoMgr('aptly', REPOMGR_URL, args.basedir, REPOMGR_ORIGIN, applogger)
kwargs = {'sources_list': args.sources_list, 'deb_list': args.deb_list,
'dsc_list': args.dsc_list}
repomgr.sync(args.repository, args.repo_list, **kwargs, no_clear=args.no_clear)
def _handleMirror(args):
repomgr = RepoMgr('aptly', REPOMGR_URL, '/tmp', REPOMGR_ORIGIN, applogger)
kwargs = {'url': args.url, 'distribution': args.distribution, 'component': args.component,
'architectures': args.architectures, 'with_sources': args.with_sources}
repomgr.mirror(args.repository, **kwargs)
def _handleMerge(args):
repomgr = RepoMgr('aptly', REPOMGR_URL, '/tmp', REPOMGR_ORIGIN, applogger)
repomgr.merge(args.name, args.repo_list)
def _handleCopyPkg(args):
repomgr = RepoMgr('aptly', REPOMGR_URL, '/tmp', REPOMGR_ORIGIN, applogger)
repomgr.copy_pkgs(args.source, args.destination, args.package_name, deploy=True, pkg_type=args.package_type)
def _handleUploadPkg(args):
repomgr = RepoMgr('aptly', REPOMGR_URL, '/tmp', REPOMGR_ORIGIN, applogger)
repomgr.upload_pkg(args.repository, args.package)
def _handleDeletePkg(args):
repomgr = RepoMgr('aptly', REPOMGR_URL, '/tmp', REPOMGR_ORIGIN, applogger)
repomgr.delete_pkg(args.repository, args.package_name, args.package_type,
pkg_version=args.package_version)
def _handleSearchPkg(args):
repomgr = RepoMgr('aptly', REPOMGR_URL, '/tmp', REPOMGR_ORIGIN, applogger)
if args.package_type == 'binary':
repomgr.search_pkg(args.repository, args.package_name, pkg_version=args.package_version,
binary=True)
else:
repomgr.search_pkg(args.repository, args.package_name, pkg_version=args.package_version,
binary=False)
def _handleRemoveRope(args):
repomgr = RepoMgr('aptly', REPOMGR_URL, '/tmp', REPOMGR_ORIGIN, applogger)
repomgr.remove_repo(args.repository)
def _handleList(_args):
repomgr = RepoMgr('aptly', REPOMGR_URL, '/tmp', REPOMGR_ORIGIN, applogger)
repomgr.list()
def _handleListPkgs(args):
repomgr = RepoMgr('aptly', REPOMGR_URL, '/tmp', REPOMGR_ORIGIN, applogger)
repomgr.list_pkgs(args.repository)
def _handleClean(_args):
repomgr = RepoMgr('aptly', REPOMGR_URL, '/tmp', REPOMGR_ORIGIN, applogger)
repomgr.clean()
def subcmd_download(subparsers):
download_parser = subparsers.add_parser('download',
help='Download specified packages and deploy them '
'through a new build repository.\n\n')
download_parser.add_argument('--repository', '-r', help='Name of the local repository', required=False,
default='deb-local-down')
download_parser.add_argument('--deb_list', help='Binary package list file', required=False, default='')
download_parser.add_argument('--dsc_list', help='Source package list file', required=False, default='')
download_parser.add_argument('--basedir', help='Temporary folder to store packages', required=False,
default='/tmp/repomgr')
download_parser.add_argument('--sources_list', help='Upstream sources list file.', required=False,
default='')
download_parser.add_argument('--no-clear', help='Not remove temporary files', action='store_true')
download_parser.set_defaults(handle=_handleDownload)
def subcmd_sync(subparsers):
sync_parser = subparsers.add_parser('sync',
help='Sync a set of repositories with specified package'
'lists.\n\n')
sync_parser.add_argument('--repository', '-r', help='Name of the local repository', required=False,
default='deb-local-sync')
sync_parser.add_argument('--repo_list', '-l', help='a set of local repos', required=False,
default='')
sync_parser.add_argument('--deb_list', help='Binary package list file', required=False,
default='')
sync_parser.add_argument('--dsc_list', help='Source package list file', required=False,
default='')
sync_parser.add_argument('--basedir', help='Temporary folder to store packages', required=False,
default='/tmp/repomgr')
sync_parser.add_argument('--sources_list', help='Upstream sources list file', required=False,
default='')
sync_parser.add_argument('--no-clear', help='Not remove temporary files', action='store_true')
sync_parser.set_defaults(handle=_handleSync)
def subcmd_mirror(subparsers):
mirror_parser = subparsers.add_parser('mirror',
help='Construct a mirror based on a remote'
'repository.\n\n')
mirror_parser.add_argument('--repository', '-r', help='Name of the remote repository', required=False,
default='deb-remote-tmp')
mirror_parser.add_argument('--url', help='URL of remote repository', required=False,
default='http://nginx.org/packages/debian/')
mirror_parser.add_argument('--distribution', '-d', help='distribution name', required=False,
default='buster')
mirror_parser.add_argument('--component', '-c', help='component name', required=False,
default='nginx')
mirror_parser.add_argument('--architectures', '-a', help='architectures', required=False,
default='amd64')
mirror_parser.add_argument('--with-sources', '-s', help='include source packages',
action='store_true')
mirror_parser.set_defaults(handle=_handleMirror)
def subcmd_merge(subparsers):
merge_parser = subparsers.add_parser('merge',
help='Merge several repositories into a new publication.\n\n')
merge_parser.add_argument('--name', '-n', help='Name of the new merged publication', required=True)
merge_parser.add_argument('--repo_list', '-l', help='a set of repos, seperate by comma', required=True)
merge_parser.set_defaults(handle=_handleMerge)
def subcmd_copy_pkg(subparsers):
copy_pkg_parser = subparsers.add_parser('copy_pkg',
help='Copy specified packages from source repository into destination one.\n\n')
copy_pkg_parser.add_argument('--source', '-s', help='Name of the source repository', required=True)
copy_pkg_parser.add_argument('--destination', '-d', help='Name of the destination repository, must be a local repo', required=True)
copy_pkg_parser.add_argument('--package_name', '-p', help='Name of the package(s) to be copied, separate by comma', required=True)
copy_pkg_parser.add_argument('--package_type', '-t', help='Package type, "binary" or "source"', required=False, default='binary')
copy_pkg_parser.set_defaults(handle=_handleCopyPkg)
def main():
# command line arguments
parser = argparse.ArgumentParser(add_help=True,
description='Repository management Tool',
epilog='''Tips: Use %(prog)s --help to get help for all of '
'parameters\n\n''')
subparsers = parser.add_subparsers(title='Repo control Commands:',
help='sub-command for repo-ctl\n\n')
# Three functions for three sub commands: pylint checking(too-many-statements)
subcmd_download(subparsers)
subcmd_sync(subparsers)
subcmd_mirror(subparsers)
subcmd_merge(subparsers)
subcmd_copy_pkg(subparsers)
clean_parser = subparsers.add_parser('clean', help='Clear all aptly repos.\n\n')
clean_parser.set_defaults(handle=_handleClean)
remove_repo_parser = subparsers.add_parser('remove_repo',
help='Remove a specific repository.\n\n')
remove_repo_parser.add_argument('--repository', '-r',
help='Name of the repo to be removed')
remove_repo_parser.set_defaults(handle=_handleRemoveRope)
upload_pkg_parser = subparsers.add_parser('upload_pkg',
help='Upload a Debian package into a specific '
'repository.\n\n')
upload_pkg_parser.add_argument('--package', '-p',
help='Debian package to be uploaded.', required=False)
upload_pkg_parser.add_argument('--repository', '-r',
help='Repository used to deploy this package.')
upload_pkg_parser.set_defaults(handle=_handleUploadPkg)
delete_pkg_parser = subparsers.add_parser('delete_pkg',
help='Delete a specified Debian package from a '
'specified repository.\n\n')
delete_pkg_parser.add_argument('--package_name', '-p', help='Package name to be deleted.')
delete_pkg_parser.add_argument('--package_type', '-t',
help='Package type to be deleted, "binary" or "source".')
delete_pkg_parser.add_argument('--package_version', '-v',
help='Version number of the package.', required=False)
delete_pkg_parser.add_argument('--repository', '-r', help='Local Repository to delete the '
'package from.')
delete_pkg_parser.set_defaults(handle=_handleDeletePkg)
search_pkg_parser = subparsers.add_parser('search_pkg',
help='Search a specified package from a specified '
'repository.\n\n')
search_pkg_parser.add_argument('--package_name', '-p', help='package to be looking for.')
search_pkg_parser.add_argument('--package_version', '-v', help='The version of the package.',
required=False)
search_pkg_parser.add_argument('--repository', '-r',
help='The Repository to search the package in.')
search_pkg_parser.add_argument('--package_type', '-t', help='binary or source',
required=False)
search_pkg_parser.set_defaults(handle=_handleSearchPkg)
list_parser = subparsers.add_parser('list',
help='List all aptly repos, including local repo and '
'remote mirror.\n\n')
list_parser.set_defaults(handle=_handleList)
list_pkgs_parser = subparsers.add_parser('list_pkgs',
help='List contents of a specific repo.\n\n')
list_pkgs_parser.add_argument('--repository', '-r',
help='Name of the repo to be listed')
list_pkgs_parser.set_defaults(handle=_handleListPkgs)
args = parser.parse_args()
if hasattr(args, 'handle'):
args.handle(args)
else:
parser.print_help()
if __name__ == '__main__':
main()