0b24411a7c
Aptly repos are signed with a GPG key embedded in environment containers. That key expired today (2024-02-23). Key was replaced with a new one [1]; update key fingerprint in this repo to match. [1] https://review.opendev.org/c/starlingx/tools/+/910036 Depends-On: https://review.opendev.org/c/starlingx/tools/+/910036 Partial-Bug: 2054862 Signed-off-by: Davlet Panech <davlet.panech@windriver.com> Change-Id: I7d20a1e7276796e694f837f004b8133e28ae5de4
881 lines
41 KiB
Python
Executable File
881 lines
41 KiB
Python
Executable File
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
# Copyright (C) 2021-2022 WindRiver Corporation
|
|
#
|
|
# Requires aptly-api-client:
|
|
# https://github.com/masselstine/aptly-api-client
|
|
#
|
|
# Old document of relate RESTAPI:
|
|
# https://www.aptly.info/doc/api/
|
|
#
|
|
# Realization of its real RESTAPI(go)
|
|
# https://github.com/molior-dbs/aptly
|
|
from aptly_api import Client
|
|
from debian import debian_support
|
|
import os
|
|
import time
|
|
from typing import Optional
|
|
|
|
PREFIX_LOCAL = 'deb-local-'
|
|
PREFIX_REMOTE = 'deb-remote-'
|
|
PREFIX_MERGE = 'deb-merge-'
|
|
SIGN_KEY = '8C58D092AD39022571D1F57AFA689A0116E3E718'
|
|
SIGN_PASSWD = 'starlingx'
|
|
DEFAULT_TIMEOUT_COUNT = 1
|
|
STX_DIST = os.environ.get('STX_DIST')
|
|
|
|
# Class used to manage aptly data base, it can:
|
|
# create_remote: Create a repository link to a remote mirror
|
|
# deploy_remote: Sync and deploy a remote mirror
|
|
# list_remotes: List all remote repositories
|
|
# remove_remote: Delete a remote repository
|
|
# create_local: Create a local repository
|
|
# upload_pkg_local: Upload a deb package into a local repository
|
|
# delete_pkg_local: Remove a deb package from a local repository
|
|
# pkg_exist: Search a package in a set of repos
|
|
# copy_pkgs: Copy packages from one repo to another
|
|
# deploy_local: Deploy a local repository
|
|
# list_local: List all local repositories
|
|
# remove_local: Delete a local repository
|
|
# clean_all: Clean all meta data including repo, public, distribution, package, task
|
|
|
|
|
|
class Deb_aptly():
|
|
def __init__(self, url, origin, logger):
|
|
'''The basic interface to manage aptly database. '''
|
|
self.logger = logger
|
|
self.url = url
|
|
self.aptly = Client(self.url)
|
|
self.logger.info('Aptly connected, version: %s', self.aptly.misc.version())
|
|
if origin:
|
|
self.origin = origin.strip() or None
|
|
else:
|
|
self.origin = None
|
|
|
|
# Create a remote mirror(make sure the name has specified prefix)
|
|
# Input
|
|
# name: the name of the remote repo : PREFIX_REMOTE-xxx
|
|
# url: the base url of the remote mirror: http://nginx.org/packages/debian
|
|
# distributions: the distribution: buster
|
|
# components: components=['nginx']
|
|
# architectures: architectures=['i386', 'arm64']
|
|
# with_sources: with_sources=True
|
|
# Output: None or Class 'aptly_api.parts.mirrors.Mirror'
|
|
def create_remote(self, name, url, distribution, **kwargs):
|
|
'''Base on a Debian repository, construct a repo as its mirror'''
|
|
if not name.startswith(PREFIX_REMOTE):
|
|
self.logger.error('%s is not started with %s, Failed.', name, PREFIX_REMOTE)
|
|
raise ValueError('remote repository create failed: prefix error')
|
|
|
|
remote_list = self.aptly.mirrors.list()
|
|
for remote in remote_list:
|
|
if remote.name == name:
|
|
self.logger.warning('mirror %s already exists.', remote.name)
|
|
return None
|
|
extra_param = {}
|
|
extra_param['distribution'] = distribution
|
|
extra_param['ignore_signatures'] = True
|
|
for key, value in kwargs.items():
|
|
if key == 'components':
|
|
extra_param['components'] = value
|
|
if key == 'with_sources':
|
|
extra_param['with_sources'] = True
|
|
if key == 'architectures':
|
|
extra_param['architectures'] = value
|
|
# Not find good/small repository with udebs, not verified
|
|
# if key == 'with_udebs':
|
|
# extra_param['with_udebs'] = True
|
|
remote = self.aptly.mirrors.create(name, url, **extra_param)
|
|
return remote
|
|
|
|
# update a mirror called "name". Mirror exist.
|
|
# Return False if failed
|
|
def __update_mirror(self, name):
|
|
'''Sync the mirror, may take minutes, depends on the size of the mirror ans the network. '''
|
|
mirror_list = self.aptly.mirrors.list()
|
|
# Add variable mirror_find just to avoid W0631
|
|
mirror_find = False
|
|
for mirror in mirror_list:
|
|
if mirror.name == name:
|
|
mirror_find = True
|
|
break
|
|
if not mirror_find:
|
|
self.logger.warning('Publish failed for mirror %s not find', name)
|
|
return False
|
|
# Please do NOT add any parameters here beside "ignore_signatures=True", that may
|
|
# overwrite previous settings and get strange results.
|
|
task = self.aptly.mirrors.update(name=name, ignore_signatures=True)
|
|
task_state = self.__wait_for_task(task, 15)
|
|
if task_state == 'SUCCEEDED':
|
|
return True
|
|
else:
|
|
self.logger.warning('Mirror %s update failed: %s', name, task_state)
|
|
return False
|
|
|
|
# Create a snapshot based on several others
|
|
# name : string, the name of new build snapshot
|
|
# source_snapshots: list of snapshots to be merge, order matters, snapshot at front of
|
|
# list has higher priority than snapshot later in the list.
|
|
# For each package, only the one with higher version can be selected:
|
|
# Return False on failure
|
|
def __merge_snapshot(self, name, source_snapshots):
|
|
'''Merge several snapshots into one, prepare for later deploy.'''
|
|
if not name.startswith(PREFIX_MERGE):
|
|
self.logger.error('%s did not start with %s, Failed.' % (name, PREFIX_MERGE))
|
|
return False
|
|
package_refs = []
|
|
# package_uniq_dict[pkgname_arch] = [package.key, snapshot]
|
|
package_uniq_dict = dict()
|
|
source_snapshots = [x.strip() for x in source_snapshots if x.strip() != '']
|
|
# remove duplicates (keep order)
|
|
source_snapshots = list(dict.fromkeys(source_snapshots))
|
|
snap_list = self.aptly.snapshots.list()
|
|
for snapshot in source_snapshots:
|
|
snap_exist = False
|
|
for snap in snap_list:
|
|
if snap.name == snapshot:
|
|
snap_exist = True
|
|
package_list = self.aptly.snapshots.list_packages(snap.name, with_deps=False, detailed=False)
|
|
# Debug only
|
|
# package_list.sort()
|
|
# self.logger.debug('%s packages in repo %s' % (len(package_list), snapshot))
|
|
for package in package_list:
|
|
key_list = package.key.split()
|
|
# 0: pkg_arch 1: pkg_name 2: pkg_version 3: pkg_key of aptly
|
|
pkgname_arch = '_'.join([key_list[1], key_list[0]])
|
|
# Source packages are useless for LAT, ignore them.
|
|
if "Psource" == key_list[0]:
|
|
continue
|
|
# Check and drop duplicate packages
|
|
if pkgname_arch in package_uniq_dict.keys():
|
|
need_replace = False
|
|
orig_version = package_uniq_dict[pkgname_arch][0].split()[2]
|
|
if STX_DIST in orig_version and STX_DIST not in key_list[2]:
|
|
self.logger.warn('STX package %s %s has been eclipsed by upstream version %s' %
|
|
(pkgname_arch, orig_version, key_list[2]))
|
|
if debian_support.version_compare(key_list[2], orig_version) > 0:
|
|
self.logger.warn('Drop duplicate package: %s.' %
|
|
' of '.join(package_uniq_dict[pkgname_arch]))
|
|
package_refs.remove(package_uniq_dict[pkgname_arch][0])
|
|
package_refs.append(package.key)
|
|
package_uniq_dict[pkgname_arch] = [package.key, snapshot]
|
|
else:
|
|
self.logger.warn('Drop duplicate package: %s of %s.' % (package.key, snapshot))
|
|
continue
|
|
package_uniq_dict[pkgname_arch] = [package.key, snapshot]
|
|
package_refs.append(package.key)
|
|
break
|
|
if not snap_exist:
|
|
self.logger.error('snapshot %s does not exist, merge failed.' % snapshot)
|
|
return False
|
|
|
|
# Remove a same name publish if exists
|
|
# For exist snapshot called NAME, we will:
|
|
# 1, rename it to backup-NAME
|
|
# 2, Create a new snapshot: NAME
|
|
# 3, delete snapshot backup-name
|
|
backup_name = None
|
|
publish_list = self.aptly.publish.list()
|
|
for publish in publish_list:
|
|
if publish.prefix == name:
|
|
task = self.aptly.publish.drop(prefix=name, distribution=publish.distribution, force_delete=True)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('Drop publication failed %s : %s' % (name, task_state))
|
|
return False
|
|
# Remove the backup snapshot if it exists
|
|
snap_list = self.aptly.snapshots.list()
|
|
for snap in snap_list:
|
|
if snap.name == 'backup-' + name:
|
|
backup_name = 'backup-' + name
|
|
task = self.aptly.snapshots.delete(snapshotname=backup_name, force=True)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('Drop snapshot failed %s : %s' % (backup_name, task_state))
|
|
return False
|
|
# Rename the snapshot if it exists
|
|
for snap in snap_list:
|
|
if snap.name == name:
|
|
backup_name = 'backup-' + name
|
|
self.__wait_for_task(self.aptly.snapshots.update(name, backup_name))
|
|
|
|
# crate a snapshot with package_refs. Duplicate package_refs is harmless.
|
|
# Note: The key is "package_refs" instead of "source_snapshots", for function
|
|
# "create_from_packages", parameter "source_snapshots" almost has no means.
|
|
task = None
|
|
task = self.aptly.snapshots.create_from_packages(name, source_snapshots=source_snapshots, package_refs=package_refs)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
if backup_name:
|
|
self.__wait_for_task(self.aptly.snapshots.update(backup_name, name))
|
|
self.logger.warning('merge_snapshot: Snapshot for %s creation failed: %s. ' % (name, task_state))
|
|
return False
|
|
# Remove the backup snapshot if it is created above
|
|
if backup_name:
|
|
task = self.aptly.snapshots.delete(snapshotname=backup_name, force=True)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('Drop snapshot failed %s : %s' % (backup_name, task_state))
|
|
return True
|
|
|
|
# Create a snapshot based on "name" with same name
|
|
# local: True ==> local_repo False ==> remote_mirror
|
|
# Return False if failed
|
|
def __create_snapshot(self, name, local):
|
|
'''For local-repo or remote-repo, create a snapshot for it, prepare for later deploy.'''
|
|
# Remove a same name publish if exists
|
|
# For exist snapshot called NAME, we will:
|
|
# 1, rename it to backup-NAME
|
|
# 2, Create a new snapshot: NAME
|
|
# 3, delete snapshot backup-name
|
|
backup_name = None
|
|
publish_list = self.aptly.publish.list()
|
|
for publish in publish_list:
|
|
if publish.prefix == name:
|
|
task = self.aptly.publish.drop(prefix=name, distribution=publish.distribution, force_delete=True)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('Remove publication failed %s : %s' % (name, task_state))
|
|
# Rename the snapshot if exists
|
|
snap_list = self.aptly.snapshots.list()
|
|
|
|
exists = [snap for snap in snap_list if snap.name == name]
|
|
backup_exists = [snap for snap in snap_list if snap.name == 'backup-' + name]
|
|
if exists:
|
|
backup_name = 'backup-' + name
|
|
if backup_exists:
|
|
self.__wait_for_task(self.aptly.snapshots.delete(backup_name, force=True))
|
|
self.__wait_for_task(self.aptly.snapshots.update(name, backup_name))
|
|
|
|
# crate a snapshot
|
|
task = None
|
|
if local:
|
|
task = self.aptly.snapshots.create_from_repo(name, name)
|
|
else:
|
|
task = self.aptly.snapshots.create_from_mirror(name, name)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
if backup_name:
|
|
self.__wait_for_task(self.aptly.snapshots.update(backup_name, name))
|
|
self.logger.warning('create_snapshot: Snapshot for %s creation failed: %s.' % (name, task_state))
|
|
return False
|
|
if backup_name:
|
|
task = self.aptly.snapshots.delete(snapshotname=backup_name, force=True)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('Remove snapshot failed %s : %s' % (backup_name, task_state))
|
|
return True
|
|
|
|
# Wait for an aptly task up to a maximum of "count" minutes.
|
|
# By dafault, wait for DEFAULT_TIMEOUT_COUNT minute(s).
|
|
# Return: SUCCEEDED, FAILED, TIMEOUTED, EINVAL
|
|
def __wait_for_task(self, task, count=DEFAULT_TIMEOUT_COUNT):
|
|
'''Wait for an aptly task for one or more minutes'''
|
|
if count not in range(1, 30):
|
|
self.logger.error('Requested wait of % minutes is greater than 30 minutes max wait.', count)
|
|
return 'EINVAL'
|
|
timeout_factor = os.environ.get('REPOMGR_REQ_TIMEOUT_FACTOR')
|
|
if timeout_factor and timeout_factor.isdigit() and int(timeout_factor) != 0:
|
|
count *= int(timeout_factor)
|
|
while count > 0:
|
|
count -= 1
|
|
try:
|
|
# Function wait_for_task_by_id will return in 60 seconds, or timeout.
|
|
self.aptly.tasks.wait_for_task_by_id(task.id)
|
|
except Exception as e:
|
|
if count > 0:
|
|
self.logger.debug('Aptly task %d(%s) is still running' % (task.id, task.name))
|
|
else:
|
|
self.logger.debug('%s' % e)
|
|
continue
|
|
else:
|
|
# return 'SUCCEEDED' or 'FAILED'
|
|
return self.aptly.tasks.show(task.id).state
|
|
self.logger.warn('Aptly task %d(%s) timeouts.' % (task.id, task.name))
|
|
self.logger.info('Environment variable REPOMGR_REQ_TIMEOUT_FACTOR can be used to increase timeout value.')
|
|
self.logger.info('For example, set it to "5" can increase the timeout value by 5 times.')
|
|
return 'TIMEOUTED'
|
|
|
|
# Publish a local repository directly, without snapshot or signature
|
|
# If an old publish exists, drop it firstly and then create a new one.
|
|
# Do not use publish.update just for safety.
|
|
# (repo)repo_name ==> (publish)repo_name-suffix
|
|
def __quick_publish_repo(self, repo_name, suffix):
|
|
'''Create a publish based on a local repository directly, without snapshot.'''
|
|
# Caller already checked the repo_name, no need to check again
|
|
if not suffix:
|
|
self.logger.error('Quick publish needs suffix, none provided')
|
|
return
|
|
publish_name = '-'.join([repo_name, suffix])
|
|
publish_list = self.aptly.publish.list()
|
|
for publish in publish_list:
|
|
if publish.prefix == publish_name:
|
|
task = self.aptly.publish.drop(prefix=publish_name, distribution=publish.distribution, force_delete=True)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('Drop failed publication %s : %s', publish_name, task_state)
|
|
return None
|
|
task = self.aptly.publish.publish(source_kind='local', sources=[{'Name': repo_name}],
|
|
architectures=['amd64', 'source'], prefix=publish_name,
|
|
distribution=None, sign_skip=True)
|
|
task_state = self.__wait_for_task(task, 10)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('Quick publish for %s create failed: %s', publish_name, task_state)
|
|
return None
|
|
return publish_name + ' ' + 'bullseye'
|
|
|
|
# Publish a snap called "name" with prefix as name, "bullseye" as the distribution
|
|
# Return None or prefix/distribution
|
|
def __publish_snap(self, name):
|
|
'''Deploy a snapshot.'''
|
|
# Remove a same name publish if exists
|
|
publish_list = self.aptly.publish.list()
|
|
for publish in publish_list:
|
|
if publish.prefix == name:
|
|
task = self.aptly.publish.drop(prefix=name, distribution=publish.distribution, force_delete=True)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('Drop publish failed %s : %s', name, task_state)
|
|
return None
|
|
|
|
# is_remote: True => remote repo; False => local repo
|
|
is_remote = False
|
|
mirror = None
|
|
mirror_list = self.aptly.mirrors.list()
|
|
for mirror in mirror_list:
|
|
if mirror.name == name:
|
|
is_remote = True
|
|
break
|
|
|
|
# crate a publish
|
|
extra_param = {}
|
|
if is_remote:
|
|
# it is a remote repo: info storied in "mirror"
|
|
# Add 'source' to publish source packages, if no source packages, that is also harmless.
|
|
extra_param['architectures'] = mirror.architectures.append('source')
|
|
extra_param['distribution'] = mirror.distribution
|
|
extra_param['origin'] = None
|
|
else:
|
|
# Only support binary_amd64 and source packages
|
|
extra_param['architectures'] = ['amd64', 'source']
|
|
extra_param['distribution'] = None
|
|
extra_param['origin'] = self.origin
|
|
|
|
extra_param['source_kind'] = 'snapshot'
|
|
extra_param['sources'] = [{'Name': name}]
|
|
extra_param['sign_skip'] = True
|
|
extra_param['prefix'] = name
|
|
task = self.aptly.publish.publish(source_kind='snapshot', sources=extra_param['sources'],
|
|
architectures=extra_param['architectures'], prefix=extra_param['prefix'],
|
|
distribution=extra_param['distribution'],
|
|
sign_gpgkey=SIGN_KEY, sign_passphrase=SIGN_PASSWD,
|
|
origin=extra_param['origin'])
|
|
task_state = self.__wait_for_task(task, 10)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('Publication %s failed: %s' % (name, task_state))
|
|
return None
|
|
publish_list = self.aptly.publish.list()
|
|
for publish in publish_list:
|
|
if publish.prefix == name:
|
|
repo_str = publish.prefix + ' ' + publish.distribution
|
|
return repo_str
|
|
return None
|
|
|
|
# sync a remote mirror and deploy it
|
|
# Input: the name of the remote
|
|
# Output: bool
|
|
def deploy_remote(self, name):
|
|
'''Deploy a mirror, it will sync/update, snapshot and publish at last.
|
|
It may take minutes, depends on the size of the mirror and the bandwidth,
|
|
'''
|
|
if not name.startswith(PREFIX_REMOTE):
|
|
self.logger.warning('%s has no %s prefix, not a remote repository.', name, PREFIX_REMOTE)
|
|
return None
|
|
|
|
remote_list = self.aptly.mirrors.list()
|
|
remote = None
|
|
for remote in remote_list:
|
|
if remote.name == name:
|
|
break
|
|
|
|
if not remote:
|
|
self.logger.warning('mirror %s not find, please create it firstly.', name)
|
|
return None
|
|
|
|
if self.__update_mirror(name):
|
|
if self.__create_snapshot(name, False):
|
|
return self.__publish_snap(name)
|
|
return None
|
|
|
|
# info all remote repositories through logger
|
|
def list_remotes(self, quiet=False):
|
|
'''List all remote repositories/mirrors.'''
|
|
r_list = []
|
|
remote_list = self.aptly.mirrors.list()
|
|
if not len(remote_list):
|
|
if not quiet:
|
|
self.logger.info('No remote repo')
|
|
return r_list
|
|
if not quiet:
|
|
self.logger.info('%d remotes:', len(remote_list))
|
|
for remote in remote_list:
|
|
r_list.append(remote.name)
|
|
if not quiet:
|
|
self.logger.info('%s : %s : %s', remote.name, remote.archive_root, remote.distribution)
|
|
return r_list
|
|
|
|
# find and remove a remote
|
|
# Input: the name of the remote
|
|
# Output: Bool
|
|
def remove_remote(self, name):
|
|
'''Delete a remote repository/mirror and all related publish and snapshot.'''
|
|
if not name.startswith(PREFIX_REMOTE):
|
|
self.logger.warning('%s is not a correct remote name', name)
|
|
return False
|
|
|
|
# find and remove related publish
|
|
publish_list = self.aptly.publish.list()
|
|
for publish in publish_list:
|
|
if publish.prefix == name:
|
|
task = self.aptly.publish.drop(prefix=name, distribution=publish.distribution, force_delete=True)
|
|
self.__wait_for_task(task)
|
|
|
|
# find and remove related snapshot
|
|
snap_list = self.aptly.snapshots.list()
|
|
for snap in snap_list:
|
|
if snap.name == name:
|
|
task = self.aptly.snapshots.delete(snapshotname=name, force=True)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('Drop snapshot failed %s : %s', name, task_state)
|
|
|
|
# find and remove the remote(mirror)
|
|
remote_list = self.aptly.mirrors.list()
|
|
for remote in remote_list:
|
|
if remote.name == name:
|
|
task = self.aptly.mirrors.drop(name=name, force=True)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('Drop mirror failed %s : %s', name, task_state)
|
|
|
|
# Delete orphan files, wait up to 5 minutes for the cleanup to complete
|
|
task = self.aptly.db.cleanup()
|
|
self.__wait_for_task(task, 5)
|
|
|
|
return True
|
|
|
|
# info all local repositories through logger
|
|
def list_local(self, quiet=False):
|
|
'''List all local repository.'''
|
|
local_list = []
|
|
repo_list = self.aptly.repos.list()
|
|
if not len(repo_list):
|
|
self.logger.info('No local repo')
|
|
return local_list
|
|
if not quiet:
|
|
self.logger.info('%d local repos:', len(repo_list))
|
|
for repo in repo_list:
|
|
# rpo.name, repo.url, repo.distributions, repo.components
|
|
local_list.append(repo.name)
|
|
if not quiet:
|
|
self.logger.info('%s : %s : %s', repo.name, repo.default_distribution, repo.default_component)
|
|
return local_list
|
|
|
|
# Create a local repository
|
|
# Input:the name of the repo
|
|
# Output: None or repo
|
|
def create_local(self, local_name):
|
|
'''Create an empty local repository.'''
|
|
if not local_name.startswith(PREFIX_LOCAL):
|
|
self.logger.error('%s is not started with %s, Failed.', local_name, PREFIX_LOCAL)
|
|
raise ValueError('local repository create failed: prefix error.')
|
|
|
|
repo_list = self.aptly.repos.list()
|
|
for repo in repo_list:
|
|
if local_name == repo.name:
|
|
self.logger.warning('%s exists, please choose another name', local_name)
|
|
return None
|
|
|
|
# Static settings: bullseye main
|
|
repo = self.aptly.repos.create(local_name, default_distribution='bullseye', default_component='main')
|
|
return repo
|
|
|
|
# Upload a bundle of Debian package files into a local repository.
|
|
# For source package, all its package files need to be uploaded in one
|
|
# function call, or, uploaded files will not be inserted into repository
|
|
# but just deleted.
|
|
# Input:
|
|
# pkg_files: the path-name of the package files. If the file name
|
|
# contains "%3a", it will be replaced by ":".
|
|
# repo_name: the name of the local repository
|
|
# Output: Bool
|
|
def upload_pkg_local(self, pkg_files, repo_name):
|
|
'''Upload a bundle of package files into a local repository.'''
|
|
# sanity check: every package file is readable, local repository exists
|
|
if not pkg_files:
|
|
self.logger.warning('pkg_files should not be empty!')
|
|
return False
|
|
for pkg_file in set(pkg_files):
|
|
if not os.access(pkg_file, os.R_OK):
|
|
self.logger.warning('%s is NOT accessible to read.', pkg_file)
|
|
return False
|
|
if not repo_name.startswith(PREFIX_LOCAL):
|
|
self.logger.warning('%s is NOT a well formed name.', repo_name)
|
|
return False
|
|
|
|
repo_list = self.aptly.repos.list()
|
|
repo_found = False
|
|
for repo in repo_list:
|
|
if repo_name == repo.name:
|
|
self.logger.debug('repo %s was found and can be used', repo_name)
|
|
repo_found = True
|
|
break
|
|
|
|
if not repo_found:
|
|
self.logger.warning('repo %s does not exist, please create it first.', repo_name)
|
|
return False
|
|
|
|
# If the process was interrupted, leaving behind a file folder,
|
|
# clean it up by removing it before we start.
|
|
for file in self.aptly.files.list():
|
|
self.aptly.files.delete(file)
|
|
for pkg_file in set(pkg_files):
|
|
# For files with ":" in its filename, tools like 'apt' may replace it
|
|
# with '%3a' by mistake, this will cause error in aptly.
|
|
if pkg_file.find('%3a') >= 0:
|
|
rename_file = pkg_file.replace('%3a', ':')
|
|
try:
|
|
os.rename(pkg_file, rename_file)
|
|
except Exception as e:
|
|
self.logger.error('Error: %s' % e)
|
|
self.logger.error('Package file %s rename error.' % pkg_file)
|
|
raise Exception('Package file %s rename error, upload failed.' % pkg_file)
|
|
else:
|
|
# Upload package file into related file folder.
|
|
self.aptly.files.upload(repo_name, rename_file)
|
|
else:
|
|
self.aptly.files.upload(repo_name, pkg_file)
|
|
|
|
# Add uploaded file into local repository.
|
|
task = self.aptly.repos.add_uploaded_file(repo_name, repo_name, remove_processed_files=True)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('add_upload_file failed %s : %s : %s', list(pkg_files)[0], repo_name, task_state)
|
|
return True
|
|
|
|
# Delete a Debian package from a local repository.
|
|
# Input:
|
|
# local_repo: the name of the local repository
|
|
# pkg_name: the path-name of the deb file
|
|
# pkg_type: 'binary' or 'source'
|
|
# pkg_version: version of the deb file
|
|
# Output: None
|
|
def delete_pkg_local(self, local_repo, pkg_name, pkg_type, pkg_version=None):
|
|
'''Delete a binary package from a local repository.'''
|
|
# self.logger.debug('delete_pkg_local not supported yet.')
|
|
if pkg_type not in {'binary', 'source'}:
|
|
self.logger.error('package type must be one of either "binary" or "source"')
|
|
return
|
|
if not pkg_version:
|
|
query = pkg_name
|
|
else:
|
|
query = pkg_name + ' (' + pkg_version + ')'
|
|
# If we want more detailed info, add "detailed=True, with_deps=True" for search_packages.
|
|
search_result = self.aptly.repos.search_packages(local_repo, query=query)
|
|
self.logger.debug('delete_pkg_local find %d packages.' % len(search_result))
|
|
for pkg in search_result:
|
|
if (pkg_type == 'source' and pkg.key.split()[0] == 'Psource') or \
|
|
(pkg_type != 'source' and pkg.key.split()[0] != 'Psource'):
|
|
task = self.aptly.repos.delete_packages_by_key(local_repo, pkg.key)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('Delete package failed %s : %s' % (pkg_name, task_state))
|
|
|
|
def pkg_list(self, repo_list):
|
|
'''list packages available from any of the listed repos, local or remote.'''
|
|
pkg_list=[]
|
|
for repo_name in repo_list:
|
|
if repo_name.startswith(PREFIX_LOCAL):
|
|
query = 'Name'
|
|
pkgs_raw = self.aptly.repos.search_packages(repo_name, query=query)
|
|
pkgs_key = [pkg.key for pkg in pkgs_raw]
|
|
elif repo_name.startswith(PREFIX_REMOTE):
|
|
pkgs_key = self.aptly.mirrors.packages(repo_name)
|
|
for key in pkgs_key:
|
|
pkg_name = key.split()[1]
|
|
pkg_ver = key.split()[2]
|
|
pkg_arch = key.split()[0][1:]
|
|
if pkg_arch == 'source':
|
|
pkg_list.append("%s_%s.dsc" % (pkg_name, pkg_ver))
|
|
else:
|
|
pkg_list.append("%s_%s_%s.deb" % (pkg_name, pkg_ver, pkg_arch))
|
|
return pkg_list
|
|
|
|
|
|
# Search a package in a set of repos, return True if find, or False
|
|
# repolist: a list of repo names, including local repo and mirror
|
|
# pkg_name: package name
|
|
# architecture: Architecture of the package, now, only check 'source' or not
|
|
# pkg_version: the version of the package, None means version insensitive
|
|
def pkg_exist(self, repo_list, pkg_name, architecture, pkg_version=None):
|
|
'''Search a package in a bundle of repositories including local repo and remote one.'''
|
|
for repo_name in repo_list:
|
|
if repo_name.startswith(PREFIX_LOCAL):
|
|
if not pkg_version:
|
|
query = pkg_name
|
|
else:
|
|
query = pkg_name + ' (' + pkg_version + ')'
|
|
# If we want more detailed info, add "detailed=True, with_deps=True" for search_packages.
|
|
search_result = self.aptly.repos.search_packages(repo_name, query=query)
|
|
for pkg in search_result:
|
|
if architecture != 'source' and pkg.key.split()[0] != 'Psource':
|
|
self.logger.debug('pkg_exist find package %s in %s.', pkg_name, repo_name)
|
|
return True
|
|
if architecture == 'source' and pkg.key.split()[0] == 'Psource':
|
|
self.logger.debug('pkg_exist find package %s in %s.', pkg_name, repo_name)
|
|
return True
|
|
elif repo_name.startswith(PREFIX_REMOTE):
|
|
pkgs = self.aptly.mirrors.packages(repo_name)
|
|
for pkg in pkgs:
|
|
if pkg.split()[1] == pkg_name:
|
|
if architecture != 'source' and pkg.split()[0] != 'Psource' and (not pkg_version or pkg_version == pkg.split()[2]):
|
|
self.logger.debug('pkg_exist find package %s in %s.', pkg_name, repo_name)
|
|
return True
|
|
if architecture == 'source' and pkg.split()[0] == 'Psource' and (not pkg_version or pkg_version == pkg.split()[2]):
|
|
self.logger.debug('pkg_exist find package %s in %s.', pkg_name, repo_name)
|
|
return True
|
|
return False
|
|
|
|
# Copy a set of packages from one repository into another
|
|
# source: the repository name that packages been copied from
|
|
# dest: the repository name that packages been copied to
|
|
# pkg_list: list of package name to be copied
|
|
# pkg_type: binary or source. Default is binary
|
|
# overwrite: True or False. Overwrite existing packages or not
|
|
def copy_pkgs(self, source, dest, pkg_list, pkg_type='binary', overwrite=True):
|
|
'''Copy package from one repository to another local repository'''
|
|
dest_exist = False
|
|
source_exist = False
|
|
# package key list of destination and source repository
|
|
dest_pkg_keys = list()
|
|
src_pkg_keys = list()
|
|
if source == dest:
|
|
self.logger.error('%s and %s are the same repository.' % (source, dest))
|
|
return False
|
|
for repo in self.aptly.repos.list():
|
|
if dest == repo.name:
|
|
dest_exist = True
|
|
pkgs = self.aptly.repos.search_packages(dest, query='Name')
|
|
dest_pkg_keys = [pkg.key for pkg in pkgs]
|
|
if source == repo.name:
|
|
source_exist = True
|
|
pkgs = self.aptly.repos.search_packages(source, query='Name')
|
|
src_pkg_keys = [pkg.key for pkg in pkgs]
|
|
if not dest_exist:
|
|
self.logger.warning('Destination repository %s does not exist.', dest)
|
|
return False
|
|
if not source_exist:
|
|
for repo in self.aptly.mirrors.list():
|
|
if source == repo.name:
|
|
source_exist = True
|
|
src_pkg_keys = self.aptly.mirrors.packages(source)
|
|
break
|
|
if not source_exist:
|
|
self.logger.warning('Source repository %s dose not exist.', source)
|
|
return False
|
|
del_keys = list()
|
|
add_keys = list()
|
|
for key in src_pkg_keys:
|
|
package_name = key.split()[1]
|
|
package_type = key.split()[0]
|
|
if package_name not in pkg_list:
|
|
continue
|
|
if (pkg_type == 'source' and package_type != 'Psource') or (pkg_type == 'binary' and package_type == 'Psource'):
|
|
continue
|
|
# Find a package in source repository to be copied.
|
|
pkg_list.remove(package_name)
|
|
# Already exists in destination repository
|
|
if key in dest_pkg_keys:
|
|
continue
|
|
pkg_in_dest = False
|
|
for dest_key in dest_pkg_keys:
|
|
# [0] package type/arch: Psource, Pamd64, Pall. [1] package name
|
|
if package_type == dest_key.split()[0] and package_name == dest_key.split()[1]:
|
|
pkg_in_dest = True
|
|
if overwrite:
|
|
del_keys.append(dest_key)
|
|
add_keys.append(key)
|
|
break
|
|
if not pkg_in_dest:
|
|
add_keys.append(key)
|
|
if not pkg_list:
|
|
break
|
|
|
|
# check to see if any packages not find in source repository
|
|
if pkg_list:
|
|
self.logger.warning('Copy package error, %s package %s not exist in %s' % (pkg_type, ' '.join(pkg_list), source))
|
|
return False
|
|
# Remove duplicate packages from destination repository
|
|
if del_keys:
|
|
task = self.aptly.repos.delete_packages_by_key(dest, *del_keys)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('Delete packages failed: %s\n%s' % (task_state, '\n'.join(del_keys)))
|
|
return False
|
|
# Insert packages into destination repository
|
|
if add_keys:
|
|
task = self.aptly.repos.add_packages_by_key(dest, *add_keys)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('Copy packages failed: %s\n%s' % (task_state, '\n'.join(add_keys)))
|
|
return False
|
|
return True
|
|
|
|
# Merge several repositories into a new one(just snapshot and publish)
|
|
# name: the name of the new build snapshot/publish
|
|
# source_snapshots: list, snapshots to be merged
|
|
def merge_repos(self, name, source_snapshots):
|
|
'''Merge several repositories into a new publish.'''
|
|
if not name.startswith(PREFIX_MERGE):
|
|
self.logger.warning('The name should started with %s.', PREFIX_MERGE)
|
|
return None
|
|
|
|
if self.__merge_snapshot(name, source_snapshots):
|
|
ret = self.__publish_snap(name)
|
|
return ret
|
|
|
|
# deploy a local repository
|
|
# Input
|
|
# name: the name of the local repository
|
|
# suffix: suffix of the publish name
|
|
# Output: None or DebAptDistributionResponse
|
|
def deploy_local(self, name, suffix=''):
|
|
'''Deploy a local repository.'''
|
|
if not name.startswith(PREFIX_LOCAL):
|
|
self.logger.warning('%s is NOT a well formed name.', name)
|
|
return None
|
|
|
|
repo_list = self.aptly.repos.list()
|
|
repo_find = False
|
|
for repo in repo_list:
|
|
if name == repo.name:
|
|
self.logger.debug('%s find, can be used', name)
|
|
repo_find = True
|
|
break
|
|
if not repo_find:
|
|
self.logger.warning('local repo %s not found.', name)
|
|
return None
|
|
|
|
if suffix:
|
|
return self.__quick_publish_repo(name, suffix)
|
|
|
|
if self.__create_snapshot(name, True):
|
|
ret = self.__publish_snap(name)
|
|
# Delete orphan files, wait up to 5 minutes for the cleanup to complete
|
|
task = self.aptly.db.cleanup()
|
|
self.__wait_for_task(task, 5)
|
|
return ret
|
|
return None
|
|
|
|
# remove a local repository
|
|
# Input: the name of the local repository
|
|
# Output: None
|
|
def remove_local(self, name):
|
|
'''Delete a local repository, including related publish and snapshot.'''
|
|
if not name.startswith(PREFIX_LOCAL):
|
|
self.logger.warning('%s is not a correct name', name)
|
|
return None
|
|
|
|
# find and remove related publish
|
|
publish_list = self.aptly.publish.list()
|
|
for publish in publish_list:
|
|
# Remove all related publish including quick publish
|
|
if publish.prefix.startswith(name + '-') or publish.prefix == name:
|
|
task = self.aptly.publish.drop(prefix=publish.prefix, distribution='bullseye', force_delete=True)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('Drop publish failed %s : %s', name, task_state)
|
|
|
|
# find and remove related snapshot
|
|
snap_list = self.aptly.snapshots.list()
|
|
for snap in snap_list:
|
|
if snap.name == name:
|
|
task = self.aptly.snapshots.delete(snapshotname=name, force=True)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('Drop snapshot failed %s : %s', name, task_state)
|
|
|
|
# find and remove the remote(mirror)
|
|
repo_list = self.aptly.repos.list()
|
|
for repo in repo_list:
|
|
if repo.name == name:
|
|
task = self.aptly.repos.delete(reponame=name, force=True)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('Drop repo failed %s : %s', name, task_state)
|
|
|
|
# Delete orphan files, wait up to 5 minutes for the cleanup to complete
|
|
task = self.aptly.db.cleanup()
|
|
self.__wait_for_task(task, 5)
|
|
|
|
return None
|
|
|
|
# clean all metadata including remote, repository, public, distribution, task and content
|
|
# In theory, with this operation, there should be nothing left in aptly_deb
|
|
# database. Please use it carefully.
|
|
def clean_all(self):
|
|
'''Clean all metadata including remote, repository, public, distribution, task and content.'''
|
|
# clean publishes
|
|
pub_list = self.aptly.publish.list()
|
|
self.logger.info('%d publish', len(pub_list))
|
|
for pub in pub_list:
|
|
task = self.aptly.publish.drop(prefix=pub.prefix, distribution=pub.distribution, force_delete=True)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('Drop publish failed %s : %s', pub.frepix, task_state)
|
|
# clean snapshots
|
|
snap_list = self.aptly.snapshots.list()
|
|
self.logger.info('%d snapshot', len(snap_list))
|
|
for snap in snap_list:
|
|
task = self.aptly.snapshots.delete(snapshotname=snap.name, force=True)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('Drop snapshot failed %s : %s', snap.name, task_state)
|
|
|
|
# clean mirrors
|
|
mirror_list = self.aptly.mirrors.list()
|
|
self.logger.info('%d mirror', len(mirror_list))
|
|
for mirror in mirror_list:
|
|
task = self.aptly.mirrors.drop(name=mirror.name, force=True)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('Drop mirror failed %s : %s', mirror.name, task_state)
|
|
# clean local repos
|
|
repo_list = self.aptly.repos.list()
|
|
self.logger.info('%d repo', len(repo_list))
|
|
for repo in repo_list:
|
|
task = self.aptly.repos.delete(reponame=repo.name, force=True)
|
|
task_state = self.__wait_for_task(task)
|
|
if task_state != 'SUCCEEDED':
|
|
self.logger.warning('Drop repo failed %s : %s', repo.name, task_state)
|
|
# clean file folders
|
|
file_list = self.aptly.files.list()
|
|
self.logger.info('%d file folder', len(file_list))
|
|
for file in file_list:
|
|
self.aptly.files.delete(file)
|
|
# clean tasks
|
|
self.aptly.tasks.clear()
|
|
# Delete orphan files, up to 5 minutes
|
|
task = self.aptly.db.cleanup()
|
|
self.__wait_for_task(task, 5)
|