Merge "Remove redundant code"

This commit is contained in:
Zuul 2024-09-12 22:25:39 +00:00 committed by Gerrit Code Review
commit 4eea48a48a
5 changed files with 9 additions and 115 deletions

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import hashlib
import io
from openstack import exceptions
@ -68,7 +69,7 @@ class DownloadMixin:
details = self.fetch(session)
checksum = details.checksum
md5 = utils.md5(usedforsecurity=False)
md5 = hashlib.md5(usedforsecurity=False)
if output:
try:
if isinstance(output, io.IOBase):
@ -97,7 +98,7 @@ class DownloadMixin:
if checksum is not None:
_verify_checksum(
utils.md5(resp.content, usedforsecurity=False), checksum
hashlib.md5(resp.content, usedforsecurity=False), checksum
)
else:
session.log.warning(

View File

@ -24,7 +24,6 @@ import uuid
from openstack.cloud import meta
from openstack.orchestration.util import template_format
from openstack import utils
PROJECT_ID = '1c36b64c840a42cd9e9b931a369337f0'
FLAVOR_ID = '0c1d9008-f546-4608-9e8f-f8bdaec8dddd'
@ -256,7 +255,7 @@ def make_fake_image(
checksum='ee36e35a297980dee1b514de9803ec6d',
):
if data:
md5 = utils.md5(usedforsecurity=False)
md5 = hashlib.md5(usedforsecurity=False)
sha256 = hashlib.sha256()
with open(data, 'rb') as file_obj:
for chunk in iter(lambda: file_obj.read(8192), b''):

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import hashlib
import io
import operator
import tempfile
@ -22,7 +23,6 @@ from openstack import _log
from openstack import exceptions
from openstack.image.v2 import image
from openstack.tests.unit import base
from openstack import utils
IDENTIFIER = 'IDENTIFIER'
EXAMPLE = {
@ -93,7 +93,7 @@ EXAMPLE = {
def calculate_md5_checksum(data):
checksum = utils.md5(usedforsecurity=False)
checksum = hashlib.md5(usedforsecurity=False)
for chunk in data:
checksum.update(chunk)
return checksum.hexdigest()

View File

@ -11,7 +11,6 @@
# under the License.
import concurrent.futures
import hashlib
import logging
import sys
from unittest import mock
@ -301,87 +300,3 @@ class TestTinyDAG(base.TestCase):
def test_walker_fn(graph, node, lst):
lst.append(node)
graph.node_done(node)
class Test_md5(base.TestCase):
def setUp(self):
super().setUp()
self.md5_test_data = b"Openstack forever"
try:
self.md5_digest = hashlib.md5( # nosec
self.md5_test_data
).hexdigest()
self.fips_enabled = False
except ValueError:
self.md5_digest = '0d6dc3c588ae71a04ce9a6beebbbba06'
self.fips_enabled = True
def test_md5_with_data(self):
if not self.fips_enabled:
digest = utils.md5(self.md5_test_data).hexdigest()
self.assertEqual(digest, self.md5_digest)
else:
# on a FIPS enabled system, this throws a ValueError:
# [digital envelope routines: EVP_DigestInit_ex] disabled for FIPS
self.assertRaises(ValueError, utils.md5, self.md5_test_data)
if not self.fips_enabled:
digest = utils.md5(
self.md5_test_data, usedforsecurity=True
).hexdigest()
self.assertEqual(digest, self.md5_digest)
else:
self.assertRaises(
ValueError, utils.md5, self.md5_test_data, usedforsecurity=True
)
digest = utils.md5(
self.md5_test_data, usedforsecurity=False
).hexdigest()
self.assertEqual(digest, self.md5_digest)
def test_md5_without_data(self):
if not self.fips_enabled:
test_md5 = utils.md5()
test_md5.update(self.md5_test_data)
digest = test_md5.hexdigest()
self.assertEqual(digest, self.md5_digest)
else:
self.assertRaises(ValueError, utils.md5)
if not self.fips_enabled:
test_md5 = utils.md5(usedforsecurity=True)
test_md5.update(self.md5_test_data)
digest = test_md5.hexdigest()
self.assertEqual(digest, self.md5_digest)
else:
self.assertRaises(ValueError, utils.md5, usedforsecurity=True)
test_md5 = utils.md5(usedforsecurity=False)
test_md5.update(self.md5_test_data)
digest = test_md5.hexdigest()
self.assertEqual(digest, self.md5_digest)
def test_string_data_raises_type_error(self):
if not self.fips_enabled:
self.assertRaises(TypeError, hashlib.md5, 'foo')
self.assertRaises(TypeError, utils.md5, 'foo')
self.assertRaises(
TypeError, utils.md5, 'foo', usedforsecurity=True
)
else:
self.assertRaises(ValueError, hashlib.md5, 'foo')
self.assertRaises(ValueError, utils.md5, 'foo')
self.assertRaises(
ValueError, utils.md5, 'foo', usedforsecurity=True
)
self.assertRaises(TypeError, utils.md5, 'foo', usedforsecurity=False)
def test_none_data_raises_type_error(self):
if not self.fips_enabled:
self.assertRaises(TypeError, hashlib.md5, None)
self.assertRaises(TypeError, utils.md5, None)
self.assertRaises(TypeError, utils.md5, None, usedforsecurity=True)
else:
self.assertRaises(ValueError, hashlib.md5, None)
self.assertRaises(ValueError, utils.md5, None)
self.assertRaises(
ValueError, utils.md5, None, usedforsecurity=True
)
self.assertRaises(TypeError, utils.md5, None, usedforsecurity=False)

View File

@ -287,11 +287,11 @@ def maximum_supported_microversion(adapter, client_maximum):
def _hashes_up_to_date(md5, sha256, md5_key, sha256_key):
'''Compare md5 and sha256 hashes for being up to date
"""Compare md5 and sha256 hashes for being up to date
md5 and sha256 are the current values.
md5_key and sha256_key are the previous values.
'''
"""
up_to_date = False
if md5 and md5_key == md5:
up_to_date = True
@ -304,29 +304,8 @@ def _hashes_up_to_date(md5, sha256, md5_key, sha256_key):
return up_to_date
try:
_test_md5 = hashlib.md5(usedforsecurity=False) # nosec
# Python distributions that support a hashlib.md5 with the usedforsecurity
# keyword can just use that md5 definition as-is
# See https://bugs.python.org/issue9216
#
# TODO(alee) Remove this wrapper when the minimum python version is bumped
# to 3.9 (which is the first upstream version to support this keyword)
# See https://docs.python.org/3.9/library/hashlib.html
md5 = hashlib.md5
except TypeError:
def md5(string=b'', usedforsecurity=True):
"""Return an md5 hashlib object without usedforsecurity parameter
For python distributions that do not yet support this keyword
parameter, we drop the parameter
"""
return hashlib.md5(string) # nosec
def _calculate_data_hashes(data):
_md5 = md5(usedforsecurity=False)
_md5 = hashlib.md5(usedforsecurity=False)
_sha256 = hashlib.sha256()
if hasattr(data, 'read'):