Merge "Add secure hash validation for image downloads"
This commit is contained in:
@@ -10,22 +10,65 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections.abc
|
||||
import hashlib
|
||||
import io
|
||||
import typing as ty
|
||||
|
||||
from openstack import exceptions
|
||||
from openstack import utils
|
||||
|
||||
|
||||
def _verify_checksum(md5, checksum):
|
||||
if checksum:
|
||||
digest = md5.hexdigest()
|
||||
if digest != checksum:
|
||||
def _verify_checksum(
|
||||
hasher: ty.Any,
|
||||
expected_hash: str | None,
|
||||
hash_algo: str | None = None,
|
||||
) -> None:
|
||||
"""Verify checksum using the provided hasher.
|
||||
|
||||
:param hasher: A hashlib hash object
|
||||
:param expected_hash: The expected hexdigest value
|
||||
:param hash_algo: Optional name of the hash algorithm for error messages
|
||||
:raises: InvalidResponse if the hash doesn't match
|
||||
"""
|
||||
if expected_hash:
|
||||
digest = hasher.hexdigest()
|
||||
if digest != expected_hash:
|
||||
algo_msg = f" ({hash_algo})" if hash_algo else ""
|
||||
raise exceptions.InvalidResponse(
|
||||
f"checksum mismatch: {checksum} != {digest}"
|
||||
f"checksum mismatch{algo_msg}: {expected_hash} != {digest}"
|
||||
)
|
||||
|
||||
|
||||
def _integrity_iter(
|
||||
iterable: collections.abc.Iterable[bytes],
|
||||
hasher: ty.Any,
|
||||
expected_hash: str | None,
|
||||
hash_algo: str | None,
|
||||
) -> collections.abc.Iterator[bytes]:
|
||||
"""Check image data integrity
|
||||
|
||||
:param iterable: Iterable containing the image data chunks
|
||||
:param hasher: A hashlib hash object
|
||||
:param expected_hash: The expected hexdigest value
|
||||
:param hash_algo: The hash algorithm
|
||||
:yields: Chunks of data while computing hash
|
||||
:raises: InvalidResponse if the hash doesn't match
|
||||
"""
|
||||
for chunk in iterable:
|
||||
hasher.update(chunk)
|
||||
yield chunk
|
||||
_verify_checksum(hasher, expected_hash, hash_algo)
|
||||
|
||||
|
||||
def _write_chunks(
|
||||
fd: io.IOBase, chunks: collections.abc.Iterable[bytes]
|
||||
) -> None:
|
||||
"""Write chunks to file descriptor."""
|
||||
for chunk in chunks:
|
||||
fd.write(chunk)
|
||||
|
||||
|
||||
class DownloadMixin:
|
||||
id: str
|
||||
base_path: str
|
||||
@@ -44,62 +87,86 @@ class DownloadMixin:
|
||||
): ...
|
||||
|
||||
def download(
|
||||
self,
|
||||
session,
|
||||
stream=False,
|
||||
output=None,
|
||||
chunk_size=1024 * 1024,
|
||||
self, session, stream=False, output=None, chunk_size=1024 * 1024
|
||||
):
|
||||
"""Download the data contained in an image"""
|
||||
# TODO(briancurtin): This method should probably offload the get
|
||||
# operation into another thread or something of that nature.
|
||||
"""Download the data contained in an image.
|
||||
|
||||
Checksum validation uses the hash algorithm metadata fields
|
||||
(hash_value + hash_algo) if available, otherwise falls back to MD5 via
|
||||
'checksum' or 'Content-MD5'. No validation is performed if neither is
|
||||
available.
|
||||
"""
|
||||
|
||||
# Fetch image metadata first to get hash info before downloading.
|
||||
# This prevents race conditions and the need for a second conditional
|
||||
# metadata retrieval if Content-MD5 is missing (story/1619675).
|
||||
details = self.fetch(session)
|
||||
meta_checksum = getattr(details, 'checksum', None)
|
||||
meta_hash_value = getattr(details, 'hash_value', None)
|
||||
meta_hash_algo = getattr(details, 'hash_algo', None)
|
||||
|
||||
url = utils.urljoin(self.base_path, self.id, 'file')
|
||||
resp = session.get(url, stream=stream)
|
||||
|
||||
# See the following bug report for details on why the checksum
|
||||
# code may sometimes depend on a second GET call.
|
||||
# https://storyboard.openstack.org/#!/story/1619675
|
||||
checksum = resp.headers.get("Content-MD5")
|
||||
hasher = None
|
||||
expected_hash = None
|
||||
hash_algo = None
|
||||
header_checksum = resp.headers.get("Content-MD5")
|
||||
|
||||
if checksum is None:
|
||||
# If we don't receive the Content-MD5 header with the download,
|
||||
# make an additional call to get the image details and look at
|
||||
# the checksum attribute.
|
||||
details = self.fetch(session)
|
||||
checksum = details.checksum
|
||||
if meta_hash_value and meta_hash_algo:
|
||||
try:
|
||||
hasher = hashlib.new(str(meta_hash_algo))
|
||||
expected_hash = meta_hash_value
|
||||
hash_algo = meta_hash_algo
|
||||
except ValueError as ve:
|
||||
if not str(ve).startswith('unsupported hash type'):
|
||||
raise exceptions.SDKException(
|
||||
f"Unsupported hash algorithm '{meta_hash_algo}': {ve}"
|
||||
)
|
||||
|
||||
# Fall back to MD5 from metadata or header
|
||||
if not hasher:
|
||||
md5_source = meta_checksum or header_checksum
|
||||
if md5_source:
|
||||
hasher = hashlib.md5(usedforsecurity=False)
|
||||
expected_hash = md5_source
|
||||
hash_algo = 'md5'
|
||||
|
||||
if hasher is None:
|
||||
session.log.warning(
|
||||
"Unable to verify the integrity of image %s "
|
||||
"- no hash available",
|
||||
self.id,
|
||||
)
|
||||
|
||||
md5 = hashlib.md5(usedforsecurity=False)
|
||||
if output:
|
||||
try:
|
||||
chunks = resp.iter_content(chunk_size=chunk_size)
|
||||
if hasher is not None:
|
||||
chunks = _integrity_iter(
|
||||
chunks, hasher, expected_hash, hash_algo
|
||||
)
|
||||
|
||||
if isinstance(output, io.IOBase):
|
||||
for chunk in resp.iter_content(chunk_size=chunk_size):
|
||||
output.write(chunk)
|
||||
md5.update(chunk)
|
||||
_write_chunks(output, chunks)
|
||||
else:
|
||||
with open(output, 'wb') as fd:
|
||||
for chunk in resp.iter_content(chunk_size=chunk_size):
|
||||
fd.write(chunk)
|
||||
md5.update(chunk)
|
||||
_verify_checksum(md5, checksum)
|
||||
_write_chunks(fd, chunks)
|
||||
|
||||
return resp
|
||||
except Exception as e:
|
||||
raise exceptions.SDKException(f"Unable to download image: {e}")
|
||||
# if we are returning the repsonse object, ensure that it
|
||||
# has the content-md5 header so that the caller doesn't
|
||||
# need to jump through the same hoops through which we
|
||||
# just jumped.
|
||||
|
||||
if stream:
|
||||
resp.headers['content-md5'] = checksum
|
||||
# Set content-md5 header for backward compatibility with callers
|
||||
# who expect hash info in the response when streaming
|
||||
if hash_algo == 'md5' and expected_hash:
|
||||
resp.headers['content-md5'] = expected_hash
|
||||
return resp
|
||||
|
||||
if checksum is not None:
|
||||
_verify_checksum(
|
||||
hashlib.md5(resp.content, usedforsecurity=False), checksum
|
||||
)
|
||||
else:
|
||||
session.log.warning(
|
||||
"Unable to verify the integrity of image %s", (self.id)
|
||||
)
|
||||
if hasher is not None:
|
||||
# Loads entire image into memory!
|
||||
hasher.update(resp.content)
|
||||
_verify_checksum(hasher, expected_hash, hash_algo)
|
||||
|
||||
return resp
|
||||
|
||||
@@ -247,14 +247,19 @@ def make_fake_image(
|
||||
checksum='ee36e35a297980dee1b514de9803ec6d',
|
||||
):
|
||||
if data:
|
||||
md5 = hashlib.md5(usedforsecurity=False)
|
||||
sha256 = hashlib.sha256()
|
||||
md5_hash = hashlib.md5(usedforsecurity=False)
|
||||
sha256_hash = hashlib.sha256()
|
||||
sha512_hash = hashlib.sha512()
|
||||
with open(data, 'rb') as file_obj:
|
||||
for chunk in iter(lambda: file_obj.read(8192), b''):
|
||||
md5.update(chunk)
|
||||
sha256.update(chunk)
|
||||
md5 = md5.hexdigest()
|
||||
sha256 = sha256.hexdigest()
|
||||
md5_hash.update(chunk)
|
||||
sha256_hash.update(chunk)
|
||||
sha512_hash.update(chunk)
|
||||
md5 = md5_hash.hexdigest()
|
||||
sha256 = sha256_hash.hexdigest()
|
||||
sha512 = sha512_hash.hexdigest()
|
||||
else:
|
||||
sha512 = None
|
||||
return {
|
||||
'image_state': 'available',
|
||||
'container_format': 'bare',
|
||||
@@ -282,6 +287,10 @@ def make_fake_image(
|
||||
'owner_specified.openstack.sha256': sha256 or NO_SHA256,
|
||||
'owner_specified.openstack.object': f'images/{image_name}',
|
||||
'protected': False,
|
||||
# Add secure hash fields (os_hash_algo and os_hash_value)
|
||||
# Default to sha512 if data was provided, otherwise None
|
||||
'os_hash_algo': 'sha512' if sha512 else None,
|
||||
'os_hash_value': sha512 if sha512 else None,
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -124,6 +124,11 @@ class TestImage(BaseTestImage):
|
||||
uri=f'https://image.example.com/v2/images?name={self.image_name}',
|
||||
json=self.fake_search_return,
|
||||
),
|
||||
dict(
|
||||
method='GET',
|
||||
uri=f'https://image.example.com/v2/images/{self.image_id}',
|
||||
json=self.fake_image_dict,
|
||||
),
|
||||
dict(
|
||||
method='GET',
|
||||
uri=f'https://image.example.com/v2/images/{self.image_id}/file',
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
import hashlib
|
||||
import io
|
||||
import operator
|
||||
import os
|
||||
import tempfile
|
||||
from unittest import mock
|
||||
|
||||
@@ -381,72 +382,91 @@ class TestImage(base.TestCase):
|
||||
self.assertRaises(exceptions.SDKException, sot.stage, self.sess)
|
||||
|
||||
def test_download_checksum_match(self):
|
||||
sot = image.Image(**EXAMPLE)
|
||||
expected_hash = hashlib.sha512(b"abc").hexdigest()
|
||||
example_with_hash = EXAMPLE.copy()
|
||||
example_with_hash['os_hash_value'] = expected_hash
|
||||
sot = image.Image(**example_with_hash)
|
||||
|
||||
resp = FakeResponse(
|
||||
resp1 = FakeResponse(example_with_hash)
|
||||
resp2 = FakeResponse(
|
||||
b"abc",
|
||||
headers={
|
||||
"Content-MD5": "900150983cd24fb0d6963f7d28e17f72",
|
||||
"Content-Type": "application/octet-stream",
|
||||
},
|
||||
headers={"Content-Type": "application/octet-stream"},
|
||||
)
|
||||
self.sess.get.return_value = resp
|
||||
|
||||
rv = sot.download(self.sess)
|
||||
self.sess.get.assert_called_with(
|
||||
'images/IDENTIFIER/file', stream=False
|
||||
)
|
||||
|
||||
self.assertEqual(rv, resp)
|
||||
|
||||
def test_download_checksum_mismatch(self):
|
||||
sot = image.Image(**EXAMPLE)
|
||||
|
||||
resp = FakeResponse(
|
||||
b"abc",
|
||||
headers={
|
||||
"Content-MD5": "the wrong checksum",
|
||||
"Content-Type": "application/octet-stream",
|
||||
},
|
||||
)
|
||||
self.sess.get.return_value = resp
|
||||
|
||||
self.assertRaises(exceptions.InvalidResponse, sot.download, self.sess)
|
||||
|
||||
def test_download_no_checksum_header(self):
|
||||
sot = image.Image(**EXAMPLE)
|
||||
|
||||
resp1 = FakeResponse(
|
||||
b"abc", headers={"Content-Type": "application/octet-stream"}
|
||||
)
|
||||
|
||||
resp2 = FakeResponse({"checksum": "900150983cd24fb0d6963f7d28e17f72"})
|
||||
|
||||
self.sess.get.side_effect = [resp1, resp2]
|
||||
|
||||
rv = sot.download(self.sess)
|
||||
self.sess.get.assert_has_calls(
|
||||
[
|
||||
mock.call('images/IDENTIFIER/file', stream=False),
|
||||
mock.call(
|
||||
'images/IDENTIFIER',
|
||||
microversion=None,
|
||||
params={},
|
||||
skip_cache=False,
|
||||
),
|
||||
mock.call('images/IDENTIFIER/file', stream=False),
|
||||
]
|
||||
)
|
||||
|
||||
self.assertEqual(rv, resp1)
|
||||
self.assertEqual(rv, resp2)
|
||||
|
||||
def test_download_no_checksum_at_all2(self):
|
||||
sot = image.Image(**EXAMPLE)
|
||||
def test_download_checksum_mismatch(self):
|
||||
example_with_wrong_hash = EXAMPLE.copy()
|
||||
example_with_wrong_hash['os_hash_value'] = "wrong_hash_value"
|
||||
sot = image.Image(**example_with_wrong_hash)
|
||||
|
||||
resp1 = FakeResponse(
|
||||
resp1 = FakeResponse(example_with_wrong_hash)
|
||||
resp2 = FakeResponse(
|
||||
b"abc",
|
||||
headers={"Content-Type": "application/octet-stream"},
|
||||
)
|
||||
|
||||
self.sess.get.side_effect = [resp1, resp2]
|
||||
|
||||
self.assertRaises(exceptions.InvalidResponse, sot.download, self.sess)
|
||||
|
||||
def test_download_md5_fallback(self):
|
||||
expected_md5 = hashlib.md5(b"abc", usedforsecurity=False).hexdigest()
|
||||
example_md5_only = EXAMPLE.copy()
|
||||
example_md5_only['os_hash_algo'] = None
|
||||
example_md5_only['os_hash_value'] = None
|
||||
example_md5_only['checksum'] = expected_md5
|
||||
sot = image.Image(**example_md5_only)
|
||||
|
||||
resp1 = FakeResponse(example_md5_only)
|
||||
resp2 = FakeResponse(
|
||||
b"abc", headers={"Content-Type": "application/octet-stream"}
|
||||
)
|
||||
|
||||
resp2 = FakeResponse({"checksum": None})
|
||||
self.sess.get.side_effect = [resp1, resp2]
|
||||
|
||||
rv = sot.download(self.sess)
|
||||
self.sess.get.assert_has_calls(
|
||||
[
|
||||
mock.call(
|
||||
'images/IDENTIFIER',
|
||||
microversion=None,
|
||||
params={},
|
||||
skip_cache=False,
|
||||
),
|
||||
mock.call('images/IDENTIFIER/file', stream=False),
|
||||
]
|
||||
)
|
||||
|
||||
self.assertEqual(rv, resp2)
|
||||
|
||||
def test_download_no_checksum_at_all2(self):
|
||||
# No hash available at all
|
||||
example_no_hash = EXAMPLE.copy()
|
||||
example_no_hash['os_hash_algo'] = None
|
||||
example_no_hash['os_hash_value'] = None
|
||||
example_no_hash['checksum'] = None
|
||||
sot = image.Image(**example_no_hash)
|
||||
|
||||
resp1 = FakeResponse(example_no_hash)
|
||||
resp2 = FakeResponse(
|
||||
b"abc", headers={"Content-Type": "application/octet-stream"}
|
||||
)
|
||||
|
||||
self.sess.get.side_effect = [resp1, resp2]
|
||||
|
||||
@@ -457,74 +477,220 @@ class TestImage(base.TestCase):
|
||||
len(log.records), 1, "Too many warnings were logged"
|
||||
)
|
||||
self.assertEqual(
|
||||
"Unable to verify the integrity of image %s",
|
||||
"Unable to verify the integrity of image %s "
|
||||
"- no hash available",
|
||||
log.records[0].msg,
|
||||
)
|
||||
self.assertEqual((sot.id,), log.records[0].args)
|
||||
|
||||
self.sess.get.assert_has_calls(
|
||||
[
|
||||
mock.call('images/IDENTIFIER/file', stream=False),
|
||||
mock.call(
|
||||
'images/IDENTIFIER',
|
||||
microversion=None,
|
||||
params={},
|
||||
skip_cache=False,
|
||||
),
|
||||
mock.call('images/IDENTIFIER/file', stream=False),
|
||||
]
|
||||
)
|
||||
|
||||
self.assertEqual(rv, resp1)
|
||||
self.assertEqual(rv, resp2)
|
||||
|
||||
def test_download_stream(self):
|
||||
sot = image.Image(**EXAMPLE)
|
||||
expected_hash = hashlib.sha512(b"abc").hexdigest()
|
||||
example_with_hash = EXAMPLE.copy()
|
||||
example_with_hash['os_hash_value'] = expected_hash
|
||||
sot = image.Image(**example_with_hash)
|
||||
|
||||
resp = FakeResponse(
|
||||
resp1 = FakeResponse(example_with_hash)
|
||||
resp2 = FakeResponse(
|
||||
b"abc",
|
||||
headers={
|
||||
"Content-MD5": "900150983cd24fb0d6963f7d28e17f72",
|
||||
"Content-Type": "application/octet-stream",
|
||||
},
|
||||
headers={"Content-Type": "application/octet-stream"},
|
||||
)
|
||||
self.sess.get.return_value = resp
|
||||
|
||||
self.sess.get.side_effect = [resp1, resp2]
|
||||
|
||||
rv = sot.download(self.sess, stream=True)
|
||||
self.sess.get.assert_called_with('images/IDENTIFIER/file', stream=True)
|
||||
self.sess.get.assert_has_calls(
|
||||
[
|
||||
mock.call(
|
||||
'images/IDENTIFIER',
|
||||
microversion=None,
|
||||
params={},
|
||||
skip_cache=False,
|
||||
),
|
||||
mock.call('images/IDENTIFIER/file', stream=True),
|
||||
]
|
||||
)
|
||||
|
||||
self.assertEqual(rv, resp)
|
||||
self.assertEqual(rv, resp2)
|
||||
self.assertIsNone(rv.headers.get('content-md5'))
|
||||
|
||||
def test_image_download_output_fd(self):
|
||||
output_file = io.BytesIO()
|
||||
sot = image.Image(**EXAMPLE)
|
||||
expected_hash = hashlib.sha512(b'0102').hexdigest()
|
||||
example_with_hash = EXAMPLE.copy()
|
||||
example_with_hash['os_hash_value'] = expected_hash
|
||||
sot = image.Image(**example_with_hash)
|
||||
|
||||
fetch_response = FakeResponse(example_with_hash)
|
||||
response = mock.Mock()
|
||||
response.status_code = 200
|
||||
response.iter_content.return_value = [b'01', b'02']
|
||||
response.headers = {
|
||||
'Content-MD5': calculate_md5_checksum(
|
||||
response.iter_content.return_value
|
||||
)
|
||||
}
|
||||
self.sess.get = mock.Mock(return_value=response)
|
||||
response.headers = {}
|
||||
|
||||
self.sess.get = mock.Mock(side_effect=[fetch_response, response])
|
||||
sot.download(self.sess, output=output_file)
|
||||
output_file.seek(0)
|
||||
self.assertEqual(b'0102', output_file.read())
|
||||
|
||||
def test_image_download_output_file(self):
|
||||
sot = image.Image(**EXAMPLE)
|
||||
expected_hash = hashlib.sha512(b'0102').hexdigest()
|
||||
example_with_hash = EXAMPLE.copy()
|
||||
example_with_hash['os_hash_value'] = expected_hash
|
||||
sot = image.Image(**example_with_hash)
|
||||
|
||||
fetch_response = FakeResponse(example_with_hash)
|
||||
response = mock.Mock()
|
||||
response.status_code = 200
|
||||
response.iter_content.return_value = [b'01', b'02']
|
||||
response.headers = {
|
||||
'Content-MD5': calculate_md5_checksum(
|
||||
response.iter_content.return_value
|
||||
)
|
||||
}
|
||||
self.sess.get = mock.Mock(return_value=response)
|
||||
response.headers = {}
|
||||
|
||||
output_file = tempfile.NamedTemporaryFile()
|
||||
sot.download(self.sess, output=output_file.name)
|
||||
output_file.seek(0)
|
||||
self.assertEqual(b'0102', output_file.read())
|
||||
self.sess.get = mock.Mock(side_effect=[fetch_response, response])
|
||||
|
||||
output_file = tempfile.NamedTemporaryFile(delete=False)
|
||||
output_file.close()
|
||||
try:
|
||||
sot.download(self.sess, output=output_file.name)
|
||||
with open(output_file.name, 'rb') as fd:
|
||||
self.assertEqual(b'0102', fd.read())
|
||||
finally:
|
||||
os.unlink(output_file.name)
|
||||
|
||||
def test_download_secure_hash_sha256(self):
|
||||
expected_hash = hashlib.sha256(b"abc").hexdigest()
|
||||
example_with_sha256 = EXAMPLE.copy()
|
||||
example_with_sha256['os_hash_algo'] = 'sha256'
|
||||
example_with_sha256['os_hash_value'] = expected_hash
|
||||
sot = image.Image(**example_with_sha256)
|
||||
|
||||
resp1 = FakeResponse(example_with_sha256)
|
||||
resp2 = FakeResponse(
|
||||
b"abc",
|
||||
headers={"Content-Type": "application/octet-stream"},
|
||||
)
|
||||
|
||||
self.sess.get.side_effect = [resp1, resp2]
|
||||
|
||||
rv = sot.download(self.sess)
|
||||
self.assertEqual(rv, resp2)
|
||||
|
||||
def test_download_secure_hash_sha384(self):
|
||||
expected_hash = hashlib.sha384(b"abc").hexdigest()
|
||||
example_with_sha384 = EXAMPLE.copy()
|
||||
example_with_sha384['os_hash_algo'] = 'sha384'
|
||||
example_with_sha384['os_hash_value'] = expected_hash
|
||||
sot = image.Image(**example_with_sha384)
|
||||
|
||||
resp1 = FakeResponse(example_with_sha384)
|
||||
resp2 = FakeResponse(
|
||||
b"abc",
|
||||
headers={"Content-Type": "application/octet-stream"},
|
||||
)
|
||||
|
||||
self.sess.get.side_effect = [resp1, resp2]
|
||||
|
||||
rv = sot.download(self.sess)
|
||||
self.assertEqual(rv, resp2)
|
||||
|
||||
def test_download_content_md5_header_ignored(self):
|
||||
correct_hash = hashlib.sha512(b"abc").hexdigest()
|
||||
example_with_hash = EXAMPLE.copy()
|
||||
example_with_hash['os_hash_value'] = correct_hash
|
||||
sot = image.Image(**example_with_hash)
|
||||
|
||||
resp1 = FakeResponse(example_with_hash)
|
||||
resp2 = FakeResponse(
|
||||
b"abc",
|
||||
headers={
|
||||
"Content-MD5": "wrong_header_hash_that_should_be_ignored",
|
||||
"Content-Type": "application/octet-stream",
|
||||
},
|
||||
)
|
||||
|
||||
self.sess.get.side_effect = [resp1, resp2]
|
||||
|
||||
# Succeeds since only metadata hash is used
|
||||
rv = sot.download(self.sess)
|
||||
self.assertEqual(rv, resp2)
|
||||
|
||||
def test_download_secure_hash_mismatch_sha512(self):
|
||||
example_with_wrong_hash = EXAMPLE.copy()
|
||||
example_with_wrong_hash['os_hash_value'] = "wrong_sha512_hash"
|
||||
sot = image.Image(**example_with_wrong_hash)
|
||||
|
||||
resp1 = FakeResponse(example_with_wrong_hash)
|
||||
resp2 = FakeResponse(
|
||||
b"abc",
|
||||
headers={"Content-Type": "application/octet-stream"},
|
||||
)
|
||||
|
||||
self.sess.get.side_effect = [resp1, resp2]
|
||||
|
||||
self.assertRaises(exceptions.InvalidResponse, sot.download, self.sess)
|
||||
|
||||
def test_download_md5_fallback_mismatch(self):
|
||||
example_md5_only = EXAMPLE.copy()
|
||||
example_md5_only['os_hash_algo'] = None
|
||||
example_md5_only['os_hash_value'] = None
|
||||
example_md5_only['checksum'] = "wrong_md5_checksum"
|
||||
sot = image.Image(**example_md5_only)
|
||||
|
||||
resp1 = FakeResponse(example_md5_only)
|
||||
resp2 = FakeResponse(
|
||||
b"abc",
|
||||
headers={"Content-Type": "application/octet-stream"},
|
||||
)
|
||||
|
||||
self.sess.get.side_effect = [resp1, resp2]
|
||||
|
||||
self.assertRaises(exceptions.InvalidResponse, sot.download, self.sess)
|
||||
|
||||
def test_download_unsupported_hash_algo_raises(self):
|
||||
example_unsupported = EXAMPLE.copy()
|
||||
example_unsupported['os_hash_algo'] = 'unsupported_algo'
|
||||
example_unsupported['os_hash_value'] = 'some_hash_value'
|
||||
sot = image.Image(**example_unsupported)
|
||||
|
||||
resp1 = FakeResponse(example_unsupported)
|
||||
resp2 = FakeResponse(
|
||||
b"abc",
|
||||
headers={"Content-Type": "application/octet-stream"},
|
||||
)
|
||||
|
||||
self.sess.get.side_effect = [resp1, resp2]
|
||||
|
||||
self.assertRaises(exceptions.SDKException, sot.download, self.sess)
|
||||
|
||||
def test_download_unsupported_hash_algo_falls_back_to_md5(self):
|
||||
correct_md5 = hashlib.md5(b"abc", usedforsecurity=False).hexdigest()
|
||||
example_unsupported = EXAMPLE.copy()
|
||||
example_unsupported['os_hash_algo'] = 'ancient_hash_algo'
|
||||
example_unsupported['os_hash_value'] = 'irrelevant_hash'
|
||||
example_unsupported['checksum'] = correct_md5
|
||||
sot = image.Image(**example_unsupported)
|
||||
|
||||
resp1 = FakeResponse(example_unsupported)
|
||||
resp2 = FakeResponse(
|
||||
b"abc",
|
||||
headers={"Content-Type": "application/octet-stream"},
|
||||
)
|
||||
|
||||
self.sess.get.side_effect = [resp1, resp2]
|
||||
|
||||
rv = sot.download(self.sess)
|
||||
self.assertEqual(rv, resp2)
|
||||
|
||||
def test_image_update(self):
|
||||
values = EXAMPLE.copy()
|
||||
|
||||
@@ -65,9 +65,9 @@ def make_names():
|
||||
print(imp)
|
||||
print('\n')
|
||||
print("class ServicesMixin:\n")
|
||||
for service in services: # type: ignore[assignment]
|
||||
if service:
|
||||
print(f" {service}")
|
||||
for attr in services:
|
||||
if attr:
|
||||
print(f" {attr}")
|
||||
else:
|
||||
print()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user