Add support for CentOS SUM files

The CentOS Stream SUM files uses format:
  # FILENAME: <size> bytes
  ALGORITHM (FILENAME) = CHECKSUM

Compared to the more common format:
  CHECKSUM  *FILE_A
  CHECKSUM  FILE_B

Use regular expressions to check for filename both
in the middle with parentheses and at the end.
Similarly look for valid checksums at beginning or
end of line. Also look for know checsum patterns in
case file only contain the checksum iteself.

Change-Id: I9e49c1a6c66e51a7b884485f0bcaf7f1802bda33
This commit is contained in:
Harald Jensås 2023-05-03 15:02:51 +02:00
parent f37ea85a27
commit e7a048ecbe
No known key found for this signature in database
GPG Key ID: 693852E00DCEA408
3 changed files with 180 additions and 6 deletions

View File

@ -14,6 +14,7 @@
import hashlib import hashlib
import os import os
import re
import tempfile import tempfile
import time import time
from urllib import parse as urlparse from urllib import parse as urlparse
@ -107,6 +108,24 @@ def _is_checksum_url(checksum):
return False return False
MD5_MATCH = r"^([a-fA-F\d]{32})\s" # MD5 at beginning of line
MD5_MATCH_END = r"\s([a-fA-F\d]{32})$" # MD5 at end of line
MD5_MATCH_ONLY = r"^([a-fA-F\d]{32})$" # MD5 only
SHA256_MATCH = r"^([a-fA-F\d]{64})\s" # SHA256 at beginning of line
SHA256_MATCH_END = r"\s([a-fA-F\d]{64})$" # SHA256 at end of line
SHA256_MATCH_ONLY = r"^([a-fA-F\d]{64})$" # SHA256 only
SHA512_MATCH = r"^([a-fA-F\d]{128})\s" # SHA512 at beginning of line
SHA512_MATCH_END = r"\s([a-fA-F\d]{128})$" # SHA512 at end of line
SHA512_MATCH_ONLY = r"^([a-fA-F\d]{128})$" # SHA512 only
FILENAME_MATCH_END = r"\s[*]?{filename}$" # Filename binary/text end of line
FILENAME_MATCH_PARENTHESES = r"\s\({filename}\)\s" # CentOS images
CHECKSUM_MATCHERS = (MD5_MATCH, MD5_MATCH_END, SHA256_MATCH, SHA256_MATCH_END,
SHA512_MATCH, SHA512_MATCH_END)
CHECKSUM_ONLY_MATCHERS = (MD5_MATCH_ONLY, SHA256_MATCH_ONLY, SHA512_MATCH_ONLY)
FILENAME_MATCHERS = (FILENAME_MATCH_END, FILENAME_MATCH_PARENTHESES)
def _fetch_checksum(checksum, image_info): def _fetch_checksum(checksum, image_info):
"""Fetch checksum from remote location, if needed.""" """Fetch checksum from remote location, if needed."""
if not _is_checksum_url(checksum): if not _is_checksum_url(checksum):
@ -121,17 +140,33 @@ def _fetch_checksum(checksum, image_info):
elif len(lines) == 1: elif len(lines) == 1:
# Special case - checksums file with only the checksum itself # Special case - checksums file with only the checksum itself
if ' ' not in lines[0]: if ' ' not in lines[0]:
return lines[0] for matcher in CHECKSUM_ONLY_MATCHERS:
checksum = re.findall(matcher, lines[0])
if checksum:
return checksum[0]
raise errors.ImageDownloadError(
checksum, ("Invalid checksum file (No valid checksum found) %s"
% lines))
# FIXME(dtantsur): can we assume the same name for all images? # FIXME(dtantsur): can we assume the same name for all images?
expected_fname = os.path.basename(urlparse.urlparse( expected_fname = os.path.basename(urlparse.urlparse(
image_info['urls'][0]).path) image_info['urls'][0]).path)
for line in lines: for line in lines:
checksum, fname = line.strip().split(None, 1) # Ignore comment lines
# The star symbol designates binary mode, which is the same as text if line.startswith("#"):
# mode on GNU systems. continue
if fname.strip().lstrip('*') == expected_fname:
return checksum.strip() # Ignore checksums for other files
for matcher in FILENAME_MATCHERS:
if re.findall(matcher.format(filename=expected_fname), line):
break
else:
continue
for matcher in CHECKSUM_MATCHERS:
checksum = re.findall(matcher, line)
if checksum:
return checksum[0]
raise errors.ImageDownloadError( raise errors.ImageDownloadError(
checksum, "Checksum file does not contain name %s" % expected_fname) checksum, "Checksum file does not contain name %s" % expected_fname)

View File

@ -1675,6 +1675,113 @@ foobar irrelevant file.img
]) ])
self.assertEqual(fake_cs, image_download._hash_algo.hexdigest()) self.assertEqual(fake_cs, image_download._hash_algo.hexdigest())
def test_download_image_and_centos_checksum_md5(self, requests_mock,
hash_mock):
content = ['SpongeBob', 'SquarePants']
fake_cs = "019fe036425da1c562f2e9f5299820bf"
cs_response = mock.Mock()
cs_response.status_code = 200
cs_response.text = """
# centos-image.img: 1005593088 bytes
MD5 (centos-image.img) = %s
""" % fake_cs
response = mock.Mock()
response.status_code = 200
response.iter_content.return_value = content
requests_mock.side_effect = [cs_response, response]
image_info = _build_fake_image_info(
'http://example.com/path/centos-image.img')
image_info['checksum'] = 'http://example.com/checksum'
del image_info['os_hash_algo']
del image_info['os_hash_value']
CONF.set_override('md5_enabled', True)
hash_mock.return_value.hexdigest.return_value = fake_cs
image_download = standby.ImageDownload(image_info)
self.assertEqual(content, list(image_download))
requests_mock.assert_has_calls([
mock.call('http://example.com/checksum', cert=None,
verify=True,
stream=True, proxies={}, timeout=60),
mock.call(image_info['urls'][0], cert=None, verify=True,
stream=True, proxies={}, timeout=60),
])
self.assertEqual(fake_cs, image_download._hash_algo.hexdigest())
def test_download_image_and_centos_checksum_sha256(self, requests_mock,
hash_mock):
content = ['SpongeBob', 'SquarePants']
fake_cs = ('3b678e4fb651d450f4970e1647abc9b0a38bff3febd3d558753'
'623c66369a633')
cs_response = mock.Mock()
cs_response.status_code = 200
cs_response.text = """
# centos-image.img: 1005593088 bytes
SHA256 (centos-image.img) = %s
""" % fake_cs
response = mock.Mock()
response.status_code = 200
response.iter_content.return_value = iter(content)
requests_mock.side_effect = [cs_response, response]
image_info = _build_fake_image_info(
'http://example.com/path/centos-image.img')
image_info['checksum'] = 'http://example.com/checksum'
del image_info['os_hash_algo']
del image_info['os_hash_value']
hash_mock.return_value.hexdigest.return_value = fake_cs
image_download = standby.ImageDownload(image_info)
self.assertEqual(content, list(image_download))
requests_mock.assert_has_calls([
mock.call('http://example.com/checksum', cert=None,
verify=True,
stream=True, proxies={}, timeout=60),
mock.call(image_info['urls'][0], cert=None, verify=True,
stream=True, proxies={}, timeout=60),
])
self.assertEqual(fake_cs, image_download._hash_algo.hexdigest())
hash_mock.assert_has_calls([
mock.call('sha256')])
def test_download_image_and_centos_checksum_sha512(self, requests_mock,
hash_mock):
content = ['SpongeBob', 'SquarePants']
fake_cs = ('3b678e4fb651d450f4970e1647abc9b0a38bff3febd3d558753'
'623c66369a6333b678e4fb651d450f4970e1647abc9b0a38b'
'ff3febd3d558753623c66369a633')
cs_response = mock.Mock()
cs_response.status_code = 200
cs_response.text = """
# centos-image.img: 1005593088 bytes
SHA512 (centos-image.img) = %s
""" % fake_cs
response = mock.Mock()
response.status_code = 200
response.iter_content.return_value = iter(content)
requests_mock.side_effect = [cs_response, response]
image_info = _build_fake_image_info(
'http://example.com/path/centos-image.img')
image_info['checksum'] = 'http://example.com/checksum'
del image_info['os_hash_algo']
del image_info['os_hash_value']
hash_mock.return_value.hexdigest.return_value = fake_cs
image_download = standby.ImageDownload(image_info)
self.assertEqual(content, list(image_download))
requests_mock.assert_has_calls([
mock.call('http://example.com/checksum', cert=None,
verify=True,
stream=True, proxies={}, timeout=60),
mock.call(image_info['urls'][0], cert=None, verify=True,
stream=True, proxies={}, timeout=60),
])
self.assertEqual(fake_cs, image_download._hash_algo.hexdigest())
hash_mock.assert_has_calls([
mock.call('sha512')])
def test_download_image_and_checksum_multiple_sha256(self, requests_mock, def test_download_image_and_checksum_multiple_sha256(self, requests_mock,
hash_mock): hash_mock):
content = ['SpongeBob', 'SquarePants'] content = ['SpongeBob', 'SquarePants']
@ -1885,3 +1992,24 @@ foobar irrelevant file.img
'Received status code 400 from ' 'Received status code 400 from '
'http://example.com/checksum', 'http://example.com/checksum',
standby.ImageDownload, image_info) standby.ImageDownload, image_info)
def test_download_image_and_invalid_checksum(self, requests_mock,
hash_mock):
content = ['SpongeBob', 'SquarePants']
fake_cs = "invalid"
cs_response = mock.Mock()
cs_response.status_code = 200
cs_response.text = fake_cs + '\n'
response = mock.Mock()
response.status_code = 200
response.iter_content.return_value = content
requests_mock.side_effect = [cs_response, response]
image_info = _build_fake_image_info(
'http://example.com/path/image.img')
image_info['os_hash_algo'] = 'sha512'
image_info['os_hash_value'] = 'http://example.com/checksum'
self.assertRaisesRegex(
errors.ImageDownloadError,
r"Invalid checksum file \(No valid checksum found\) \['invalid'\]",
standby.ImageDownload, image_info)

View File

@ -0,0 +1,11 @@
---
features:
- |
Improved parsing of checksum files.
* Added support for the ``ALGORITHM (FILENAME) = CHECKSUM`` format used by
CentOS Stream.
* Lines starting with ``#`` are ignored as comments.
* If checksum file contain only the checksum itself, the content is
validated to ensure it is one of the known checksum types.