Merge "Switch back to built-in md5 function"

This commit is contained in:
Zuul 2024-10-28 09:19:47 +00:00 committed by Gerrit Code Review
commit b2bc5299b5
4 changed files with 23 additions and 19 deletions

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import hashlib
import io
import os
import re
@ -24,7 +25,6 @@ import flask
import jinja2
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils.secretutils import md5
import webob
from werkzeug import exceptions
@ -55,7 +55,7 @@ SYSTEMD_TEMPLATE = JINJA_ENV.get_template(SYSTEMD_CONF)
class Wrapped:
def __init__(self, stream_):
self.stream = stream_
self.hash = md5(usedforsecurity=False) # nosec
self.hash = hashlib.md5(usedforsecurity=False) # nosec
def read(self, line):
block = self.stream.read(line)
@ -82,8 +82,8 @@ class Loadbalancer:
cfg = file.read()
resp = webob.Response(cfg, content_type='text/plain')
resp.headers['ETag'] = (
md5(octavia_utils.b(cfg),
usedforsecurity=False).hexdigest()) # nosec
hashlib.md5(octavia_utils.b(cfg),
usedforsecurity=False).hexdigest()) # nosec
return resp
def upload_haproxy_config(self, amphora_id, lb_id):
@ -408,8 +408,8 @@ class Loadbalancer:
with open(cert_path, encoding='utf-8') as crt_file:
cert = crt_file.read()
md5sum = md5(octavia_utils.b(cert),
usedforsecurity=False).hexdigest() # nosec
md5sum = hashlib.md5(octavia_utils.b(cert),
usedforsecurity=False).hexdigest() # nosec
resp = webob.Response(json={'md5sum': md5sum})
resp.headers['ETag'] = md5sum
return resp

View File

@ -23,7 +23,6 @@ import warnings
from oslo_context import context as oslo_context
from oslo_log import log as logging
from oslo_utils.secretutils import md5
import requests
from stevedore import driver as stevedore_driver
@ -450,7 +449,8 @@ class HaproxyAmphoraLoadBalancerDriver(
if amphora and obj_id:
for cert in certs:
pem = cert_parser.build_pem(cert)
md5sum = md5(pem, usedforsecurity=False).hexdigest() # nosec
md5sum = hashlib.md5(
pem, usedforsecurity=False).hexdigest() # nosec
name = f'{cert.id}.pem'
cert_filename_list.append(
os.path.join(
@ -461,8 +461,8 @@ class HaproxyAmphoraLoadBalancerDriver(
# Build and upload the crt-list file for haproxy
crt_list = "\n".join(cert_filename_list)
crt_list = f'{crt_list}\n'.encode()
md5sum = md5(crt_list,
usedforsecurity=False).hexdigest() # nosec
md5sum = hashlib.md5(
crt_list, usedforsecurity=False).hexdigest() # nosec
name = f'{listener.id}.pem'
self._upload_cert(amphora, obj_id, crt_list, md5sum, name)
return {'tls_cert': tls_cert, 'sni_certs': sni_certs}
@ -480,7 +480,8 @@ class HaproxyAmphoraLoadBalancerDriver(
secret = secret.encode('utf-8')
except AttributeError:
pass
md5sum = md5(secret, usedforsecurity=False).hexdigest() # nosec
md5sum = hashlib.md5(
secret, usedforsecurity=False).hexdigest() # nosec
id = hashlib.sha1(secret).hexdigest() # nosec
name = f'{id}.pem'
@ -519,7 +520,8 @@ class HaproxyAmphoraLoadBalancerDriver(
pem = pem.encode('utf-8')
except AttributeError:
pass
md5sum = md5(pem, usedforsecurity=False).hexdigest() # nosec
md5sum = hashlib.md5(
pem, usedforsecurity=False).hexdigest() # nosec
name = f'{tls_cert.id}.pem'
if amphora and obj_id:
self._upload_cert(amphora, obj_id, pem=pem,

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import hashlib
import os
import random
import socket
@ -22,7 +23,6 @@ from unittest import mock
import fixtures
from oslo_config import fixture as oslo_fixture
from oslo_serialization import jsonutils
from oslo_utils.secretutils import md5
from oslo_utils import uuidutils
import webob
@ -740,9 +740,10 @@ class TestServerTestCase(base.TestCase):
rv = self.centos_app.get('/' + api_server.VERSION +
'/loadbalancer/123/certificates/test.pem')
self.assertEqual(200, rv.status_code)
self.assertEqual(dict(md5sum=md5(octavia_utils.b(CONTENT),
usedforsecurity=False).hexdigest()),
jsonutils.loads(rv.data.decode('utf-8')))
self.assertEqual(
dict(md5sum=hashlib.md5(octavia_utils.b(CONTENT),
usedforsecurity=False).hexdigest()),
jsonutils.loads(rv.data.decode('utf-8')))
def test_ubuntu_upload_certificate_md5(self):
self._test_upload_certificate_md5(consts.UBUNTU)

View File

@ -17,7 +17,6 @@ from unittest import mock
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils.secretutils import md5
from oslo_utils import uuidutils
import requests
import requests_mock
@ -354,7 +353,8 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
mock_oslo.return_value = fake_context
self.driver.cert_manager.get_secret.reset_mock()
self.driver.cert_manager.get_secret.return_value = fake_secret
ref_md5 = md5(fake_secret, usedforsecurity=False).hexdigest() # nosec
ref_md5 = hashlib.md5(
fake_secret, usedforsecurity=False).hexdigest() # nosec
ref_id = hashlib.sha1(fake_secret).hexdigest() # nosec
ref_name = f'{ref_id}.pem'
@ -418,7 +418,8 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
mock_load_certs.return_value = pool_data
fake_pem = b'fake pem'
mock_build_pem.return_value = fake_pem
ref_md5 = md5(fake_pem, usedforsecurity=False).hexdigest() # nosec
ref_md5 = hashlib.md5(
fake_pem, usedforsecurity=False).hexdigest() # nosec
ref_name = f'{pool_cert.id}.pem'
ref_path = (f'{fake_cert_dir}/{sample_listener.load_balancer.id}/'
f'{ref_name}')