downloader: succeed on GPG errors in DSCs files

Downloader sometimes fails on external .dsc files signed with unusable
GPG keys:
- expired keys
- .dsc file signed with a key that is not in apt's GPG database.
These files are typically signed by individual package maintainers, not
the official Debian/binary repository key that APT uses for binary
package indexes.

Downloader uses 2 different methods to fetch source packages, which
behave differently w.r.t. GPG checks:
- with "archive" key present in meta_data.yaml, we download using the
  "dget" utility, which in turn validates against a set of known keys
  in /usr/share/keyrings/
- without "archive", we download using "apt-get source", which doesn't
  validate GPG at all

This patch makes the downloader script ignore GPG signature errors, but
print a warning in the log. We also allow package maintainers to add an
additional option, "dsc_sha256", to check the .dsc file's checksum,
instead of its GPG signature (this will suppress the warning).
Rationale:
- it's difficult to make GPG verification accept expired keys
- we always verify sha256 checksums of the files making up the source
  package (ie the files referenced by .dsc)
- as for the .dsc file itself, we only verify its checksum if it is
  present in meta_data.yaml, "dsc_sha256". No packages do that as of
  this writing.

CHANGES
=====================
- utils.py: add a slightly different version of run_shell_cmd that
  captures and returns both STDOUT and STDERR
- debrepack.py:
  * use "dscverify" (from "devscripts" package) to verify .dsc files
  * if normal verification fails, try again with GPG check disabled
  * new key in meta_data.yaml: dsc_sha256. If present, make sure .dsc
    file's checksum matches.
  * workaround for "dget" and "dscverify" falsely succeeding when
    files referenced by .dsc are missing
  * removed functions "download_check_dsc" and "check_dsc" as they are
    no longer used after this change

HOW TO REPRODUCE
=====================
One example of a package signed with a key that is not in the current
(bullseye) version of debian-keyring, is golang-github-golang-jwt-jwt
[1]. The download fails on that package.

TESTS
=====================
- Remove golang-github-golang-jwt-jwt from /import/mirrors and reproduce
  the download error (unknown GPG key)
- Apply this patch, remove all downloaded sources, re-run downloader and
  make sure it succeeds with GPG-related warnings
- Simulate various problems with a .dsc file and make sure they are
  detected (ie the script fails):
  * .dsc URL in meta_data.yml returns http 404
  * one of the files referenced by .dsc returns http 404
  * one of the checksums in .dsc doesn't match
  * .dsc checksum it self doesn't match "dsc_sha256" in meta_data.yaml

[1] 2b7ac3c340/golang-github-dev/golang-github-golang-jwt-jwt-dev/debian/meta_data.yaml

Closes-Bug: 2072650
Signed-off-by: Davlet Panech <davlet.panech@windriver.com>
Change-Id: I2c91a997eafdcfd546d79e575c81bf6f9530ca0a
This commit is contained in:
Davlet Panech 2024-07-09 16:45:02 -04:00
parent 0a3dc5d5dc
commit e66aead87b
2 changed files with 151 additions and 65 deletions

View File

@ -24,9 +24,11 @@ import os
import progressbar
import re
import shutil
import subprocess
import sys
import tempfile
import utils
from utils import run_shell_cmd, get_download_url
from utils import run_shell_cmd, run_shell_cmd_full, get_download_url
import yaml
@ -79,35 +81,52 @@ class DownloadProgress():
else:
self.pbar.finish()
def verify_dsc_file(dsc_file, sha256, logger):
def checksum_dsc(dsc_file, logger):
# with sha256 supplied, verify it, but not the GPG signature
if sha256:
if not checksum(dsc_file, sha256, 'sha256sum', logger):
return False
try:
cmd = 'dscverify --nosigcheck %s' % dsc_file
out,err = run_shell_cmd_full(cmd, logger, logging.INFO)
except subprocess.CalledProcessError:
logger.warning ('%s: dscverify failed', dsc_file)
return False
# fall through
logger.info("validating %s" % dsc_file)
if not os.path.exists(dsc_file):
# otherwise verify the GPG signature, and if its the only problem,
# print a warning, but return success
else:
# verify with GPG check
try:
cmd = 'dscverify --verbose %s' % dsc_file
out,err = run_shell_cmd_full(cmd, logger, logging.INFO)
except subprocess.CalledProcessError:
# try again without a GPG check
try:
cmd = 'dscverify --nosigcheck %s' % dsc_file
out,err = run_shell_cmd_full(cmd, logger, logging.INFO)
except subprocess.CalledProcessError:
logger.warning ('%s: dscverify failed', dsc_file)
return False
# succeeded w/o GPG check: print a warning
logger.warning('%s: GPG signature check failed. You can suppress ' +
'this warning by adding a dsc_sha256 option with the ' +
'checksum of the .dsc file, to meta_data.yaml',
dsc_file)
# fall through
# fall through
# At this point "err" is the stderr of the most recent "dscverify" invocation.
# If some files are missing, dscverify succeeds, but prints warnings to stderr.
# Look for those and assume verification failed in this case.
if err.find('(not present)') != -1:
logger.warning ('%s: one or more referenced files are missing', dsc_file)
return False
with open(dsc_file) as f:
c = debian.deb822.Dsc(f)
base_dir = os.path.dirname(dsc_file)
for f in c['Checksums-Sha256']:
local_f = os.path.join(base_dir, f['name'])
if not checksum(local_f, f['sha256'], "sha256sum", logger):
return False
return True
def download_check_dsc(dsc_filename, dget_files, logger):
with open(dsc_filename,'r') as f:
c = debian.deb822.Dsc(f)
files = c["Files"]
for file in files:
filename = file['name']
if filename not in dget_files:
logger.error("Error: file %s not found. Error in download" % file)
return False
return True
def get_str_md5(text):
@ -220,8 +239,10 @@ class Parser():
self.srcrepo = srcrepo
self.btype = btype
self.meta_data = dict()
self.meta_data_file = None
self.versions = dict()
self.pkginfo = dict()
self.dsc_sha256 = None
def setup(self, pkgpath):
@ -238,11 +259,11 @@ class Parser():
self.logger.error("No debian folder")
raise Exception("No debian folder")
meta_data = os.path.join(self.pkginfo["debfolder"], "meta_data.yaml")
if not os.path.exists(meta_data):
self.logger.error("Not find meta_data.yaml")
raise Exception("Not find meta_data.yaml")
with open(meta_data) as f:
self.meta_data_file = os.path.join(self.pkginfo["debfolder"], "meta_data.yaml")
if not os.path.exists(self.meta_data_file):
self.logger.error("%s: file not found", self.meta_data_file)
raise Exception("%s: not find meta_data.yaml" % self.meta_data_file)
with open(self.meta_data_file) as f:
self.meta_data = yaml.full_load(f)
if "debver" not in self.meta_data:
@ -291,6 +312,14 @@ class Parser():
srcdir = self.pkginfo["debname"] + "-" + self.versions["upstream_version"]
self.pkginfo["srcdir"] = os.path.join(self.pkginfo["packdir"], srcdir)
if 'dsc_sha256' in self.meta_data:
self.dsc_sha256 = self.meta_data.get('dsc_sha256')
if not self.dsc_sha256:
raise Exception('%s: invalid empty key dsc_sha256' % self.meta_data_file)
else:
self.dsc_sha256 = None
def set_build_type(self):
local_debian = os.path.join(self.pkginfo["packdir"], "local_debian")
@ -777,46 +806,96 @@ class Parser():
elif "archive" in self.meta_data:
ver = self.versions["full_version"].split(":")[-1]
dsc_filename = self.pkginfo["debname"] + "_" + ver + ".dsc"
if checksum_dsc(dsc_filename, self.logger) is False:
dsc_file_upstream = os.path.join(self.meta_data["archive"], dsc_filename)
(dl_url, alt_dl_url) = get_download_url(dsc_file_upstream, self.strategy)
if alt_dl_url:
try:
run_shell_cmd("dget -d %s" % dl_url, self.logger)
except:
run_shell_cmd("dget -d %s" % alt_dl_url, self.logger)
else:
run_shell_cmd("dget -d %s" % dl_url, self.logger)
# check if all files from .dsc files were downloaded by dget
dget_files = run_shell_cmd("ls -m",self.logger)
if download_check_dsc(dsc_filename,dget_files, self.logger) is False:
raise Exception(f'Failed to download {dl_file}')
if not os.path.exists(dsc_filename) or not verify_dsc_file(dsc_filename, self.dsc_sha256, logger=self.logger):
self.logger.info ('%s: file not found, or integrity verification failed; (re-)downloading...', dsc_filename)
# save to a temporary directory, then move into place
dl_dir = '%s/tmp' % saveto
run_shell_cmd('rm -rf "%s" && mkdir -p "%s"' % (dl_dir, dl_dir), self.logger)
os.chdir(dl_dir)
try:
dsc_file_upstream = os.path.join(self.meta_data["archive"], dsc_filename)
(dl_url, alt_dl_url) = get_download_url(dsc_file_upstream, self.strategy)
# download w/o GPG verification
dget_flags = '--download-only --allow-unauthenticated'
if alt_dl_url:
try:
run_shell_cmd("dget %s %s" % (dget_flags, dl_url), self.logger)
except:
run_shell_cmd("dget %s %s" % (dget_flags, alt_dl_url), self.logger)
else:
run_shell_cmd("dget %s %s" % (dget_flags, dl_url), self.logger)
# verify checksums/signatures
if not verify_dsc_file(dsc_filename, self.dsc_sha256, logger=self.logger):
raise Exception('%s: %s: DSC file verification failed' % (self.meta_data_file, dsc_filename))
# move downloaded files into place
run_shell_cmd('find "%s" -mindepth 1 -maxdepth 1 -exec mv -f -t "%s" "{}" "+"' % (dl_dir, saveto), self.logger)
run_shell_cmd('rmdir "%s"' % dl_dir, self.logger)
finally:
os.chdir(saveto)
# Upload it to aptly
# FIXME: this parameter is always None (?)
if self.srcrepo is not None:
self.upload_deb_package()
elif "src_path" not in self.meta_data and "dl_hook" not in self.meta_data:
ver = self.versions["full_version"].split(":")[-1]
dsc_filename = self.pkginfo["debname"] + "_" + ver + ".dsc"
if checksum_dsc(dsc_filename, self.logger) is True:
os.chdir(pwd)
return
fullname = self.pkginfo["debname"] + "=" + self.versions["full_version"]
supported_versions = list()
# See also comments in the "archive" section above.
apt_pkg.init()
sources = apt_pkg.SourceRecords()
source_lookup = sources.lookup(self.pkginfo["debname"])
while source_lookup and self.versions["full_version"] != sources.version:
supported_versions.append(sources.version)
source_lookup = sources.lookup(self.pkginfo["debname"])
if not os.path.exists(dsc_filename) or not verify_dsc_file(dsc_filename, self.dsc_sha256, logger=self.logger):
self.logger.info ('%s: file not found, or integrity verification failed; (re-)downloading...', dsc_filename)
if not source_lookup:
self.logger.error("No source for %s", fullname)
self.logger.info("The supported versions are %s", supported_versions)
raise ValueError(f"No source for {fullname}")
# save to a temporary directory, then move into place
dl_dir = '%s/tmp' % saveto
run_shell_cmd('rm -rf "%s" && mkdir -p "%s"' % (dl_dir, dl_dir), self.logger)
os.chdir(dl_dir)
self.logger.info("Fetch %s to %s", fullname, self.pkginfo["packdir"])
run_shell_cmd("apt-get source -d %s" % fullname, self.logger)
try:
fullname = self.pkginfo["debname"] + "=" + self.versions["full_version"]
supported_versions = list()
apt_pkg.init()
sources = apt_pkg.SourceRecords()
source_lookup = sources.lookup(self.pkginfo["debname"])
while source_lookup and self.versions["full_version"] != sources.version:
supported_versions.append(sources.version)
source_lookup = sources.lookup(self.pkginfo["debname"])
if not source_lookup:
self.logger.error("No source for %s", fullname)
self.logger.info("The supported versions are %s", supported_versions)
raise ValueError(f"No source for {fullname}")
# download w/o GPG verification
apt_get_flags = '--download-only --allow-unauthenticated'
self.logger.info("Fetch %s to %s", fullname, self.pkginfo["packdir"])
run_shell_cmd("apt-get source %s %s" % (apt_get_flags, fullname), self.logger)
# verify checksums/signatures
if not verify_dsc_file(dsc_filename, self.dsc_sha256, logger=self.logger):
raise Exception('%s: %s: DSC file verification failed' % (self.meta_data_file, dsc_filename))
# move downloaded files into place
run_shell_cmd('find "%s" -mindepth 1 -maxdepth 1 -exec mv -t "%s" "{}" "+"' % (dl_dir, saveto), self.logger)
run_shell_cmd('rmdir "%s"' % dl_dir, self.logger)
finally:
os.chdir(saveto)
# Upload it to aptly
# FIXME: this parameter is always None (?)
if self.srcrepo is not None:
self.upload_deb_package()

View File

@ -135,7 +135,7 @@ def limited_walk(dir, max_depth=1):
del dirs[:]
def run_shell_cmd(cmd, logger):
def run_shell_cmd_full(cmd, logger, error_level=logging.ERROR):
if type(cmd) is str:
shell = True
elif type(cmd) in (tuple, list):
@ -149,7 +149,7 @@ def run_shell_cmd(cmd, logger):
universal_newlines=True, shell=shell)
except Exception as e:
msg = f'[ Failed to execute command: "{cmd}" Exception: "{e}" ]'
logger.error(msg)
logger.log(error_level, msg)
# Suppress the original exception when raising our own exception.
# Syntax is acquired from: https://peps.python.org/pep-0409/#proposal
raise Exception(msg) from None
@ -168,11 +168,18 @@ def run_shell_cmd(cmd, logger):
if process.returncode != 0:
msg = f'[ Command failed with a non-zero return code: "{cmd}" return code: {process.returncode} ]'
logger.error(msg)
raise Exception(msg)
logger.log(error_level, msg)
raise subprocess.CalledProcessError(
returncode=process.returncode,
cmd=cmd,
output=outs,
stderr=errs
)
return outs.strip()
return outs.strip(),errs.strip()
def run_shell_cmd(cmd, logger, error_level=logging.ERROR):
return run_shell_cmd_full(cmd, logger, error_level)[0]
def url_to_stx_mirror(url):