py3: Fix base64 usage

b64encode and b64decode returns a str in Python2 and a
bytes in Python3. This is a problem when using http/url/rest
libraries. Runtime errors are raised. Mixing str and bytes when
formatting a text might introduce an unwanted "b" for bytes, which can
lead to potential issues when sent over the network.

To keep compatibility use oslo_serialization to force the
return type to str.
There is one place where base64 urlsafe version is specifically used to
send and receive(Rest API for application upload).

One of the tests is that platform-integ-app applies which exercises part
of the changes.
Cert-mon and DC part will be exercied when DC is available on f/centos8
branch.

Story: 2006796
Task: 42797
Signed-off-by: Dan Voiculeasa <dan.voiculeasa@windriver.com>
Change-Id: I48b1c6c80363458945c6bc1a9cf7e16c743a7bd6
(cherry picked from commit 8a7c4b15c7)
This commit is contained in:
Dan Voiculeasa 2021-07-08 00:26:26 +03:00 committed by Charles Short
parent 3bc4153c17
commit 1518543bf7
9 changed files with 37 additions and 36 deletions

View File

@ -17,11 +17,11 @@
# of this software may be licensed only pursuant to the terms
# of an applicable Wind River license agreement.
#
import base64
from eventlet import greenthread
import greenlet
from oslo_config import cfg
from oslo_log import log
from oslo_serialization import base64
from oslo_service import periodic_task
import time
@ -137,9 +137,9 @@ class CertificateMonManager(periodic_task.PeriodicTasks):
raise Exception('%s certificate data missing: %s'
% (subcloud_name, item))
txt_ssl_cert = base64.b64decode(secret.data['tls.crt'])
txt_ssl_key = base64.b64decode(secret.data['tls.key'])
txt_ca_cert = base64.b64decode(secret.data['ca.crt'])
txt_ssl_cert = base64.decode_as_text(secret.data['tls.crt'])
txt_ssl_key = base64.decode_as_text(secret.data['tls.key'])
txt_ca_cert = base64.decode_as_text(secret.data['ca.crt'])
except Exception as e:
LOG.error('Cannot audit ssl certificate on %s' % subcloud_name)
LOG.exception(e)

View File

@ -17,7 +17,6 @@
# of this software may be licensed only pursuant to the terms
# of an applicable Wind River license agreement.
#
import base64
import json
import os
import re
@ -32,6 +31,7 @@ from keystoneclient.auth.identity import v3
from oslo_config import cfg
from oslo_log import log
from oslo_utils import encodeutils
from oslo_serialization import base64
from six.moves.urllib.request import Request
from six.moves.urllib.error import HTTPError
from six.moves.urllib.error import URLError
@ -109,8 +109,8 @@ def verify_adminep_cert_chain():
raise Exception('%s tls.crt data missing'
% (constants.SC_ADMIN_ENDPOINT_SECRET_NAME))
txt_ca_crt = base64.b64decode(secret_ica.data['tls.crt'])
txt_tls_crt = base64.b64decode(secret_adminep.data['tls.crt'])
txt_ca_crt = base64.decode_as_text(secret_ica.data['tls.crt'])
txt_tls_crt = base64.decode_as_text(secret_adminep.data['tls.crt'])
with tempfile.NamedTemporaryFile() as ca_tmpfile:
ca_tmpfile.write(txt_ca_crt)

View File

@ -17,7 +17,6 @@
# of this software may be licensed only pursuant to the terms
# of an applicable Wind River license agreement.
#
import base64
import re
import hashlib
from datetime import datetime
@ -29,6 +28,7 @@ from kubernetes import config
import os
from oslo_config import cfg
from oslo_log import log
from oslo_serialization import base64
from six.moves.urllib.error import URLError
from sysinv.cert_mon import utils
@ -93,11 +93,11 @@ class CertUpdateEventData(object):
self.tls_crt = None
self.tls_key = None
try:
self.ca_crt = base64.b64decode(data['ca.crt']).strip() \
self.ca_crt = base64.decode_as_text(data['ca.crt']).strip() \
if 'ca.crt' in data else ''
self.tls_crt = base64.b64decode(data['tls.crt']).strip() \
self.tls_crt = base64.decode_as_text(data['tls.crt']).strip() \
if 'tls.crt' in data else ''
self.tls_key = base64.b64decode(data['tls.key']).strip() \
self.tls_key = base64.decode_as_text(data['tls.key']).strip() \
if 'tls.key' in data else ''
except TypeError:
LOG.error('Invalid secret data.')

View File

@ -25,7 +25,6 @@
"""Utilities and helper functions."""
import ast
import base64
import boto3
from botocore.config import Config
import collections
@ -70,6 +69,7 @@ import netaddr
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import base64
from fm_api import constants as fm_constants
@ -1759,7 +1759,7 @@ def read_filtered_directory_content(dirpath, *filters):
# they can be transferred over RPC and stored in DB
content.decode('utf-8')
except UnicodeError:
content = content.encode('base64')
content = base64.encode_as_text(content)
content_dict['base64_encoded_files'] = \
content_dict.get("base64_encoded_files", []) + [filename]
@ -2578,7 +2578,7 @@ def get_aws_ecr_registry_credentials(dbapi, registry, username, password):
response = client.get_authorization_token()
token = response['authorizationData'][0]['authorizationToken']
username, password = token.decode('base64').split(':')
username, password = base64.decode_as_text(token).split(':')
except Exception as e:
raise exception.SysinvException(_(
"Failed to get AWS ECR credentials: %s" % e))
@ -2602,7 +2602,7 @@ def extract_ca_private_key_bytes_from_pem(pem_content):
raise exception.InvalidKubernetesCA
end_search += len(constants.END_PRIVATE_KEY_MARKER)
base64_key = base64.b64encode(pem_content[begin_search:end_search])
base64_key = base64.encode_as_text(pem_content[begin_search:end_search])
return base64_key
@ -2621,7 +2621,7 @@ def extract_ca_crt_bytes_from_pem(pem_content):
raise exception.InvalidKubernetesCA
end_search += len(constants.END_CERTIFICATE_MARKER)
base64_crt = base64.b64encode(pem_content[begin_search:end_search])
base64_crt = base64.encode_as_text(pem_content[begin_search:end_search])
return base64_crt
@ -2924,8 +2924,8 @@ def get_admin_ep_cert(dc_role):
raise Exception("Invalid admin endpoint certificate data.")
try:
tls_crt = base64.b64decode(data['tls.crt'])
tls_key = base64.b64decode(data['tls.key'])
tls_crt = base64.decode_as_text(data['tls.crt'])
tls_key = base64.decode_as_text(data['tls.key'])
except TypeError:
raise Exception('admin endpoint secret is invalid %s' %
endpoint_cert_secret_name)
@ -2944,7 +2944,7 @@ def get_admin_ep_cert(dc_role):
LOG.error('Cannot read DC root CA certificate %s' % e)
elif dc_role == constants.DISTRIBUTED_CLOUD_ROLE_SYSTEMCONTROLLER:
try:
ca_crt = base64.b64decode(data['ca.crt'])
ca_crt = base64.decode_as_text(data['ca.crt'])
except TypeError:
raise Exception('admin endpoint secret is invalid %s' %
endpoint_cert_secret_name)
@ -3001,7 +3001,7 @@ def get_root_ca_cert():
data = secret.data
try:
ca_crt = base64.b64decode(data['ca.crt'])
ca_crt = base64.decode_as_text(data['ca.crt'])
except TypeError:
raise Exception('Secret is invalid %s' % secret_name)

View File

@ -4,10 +4,10 @@
# SPDX-License-Identifier: Apache-2.0
#
import base64
import keyring
import requests
from oslo_serialization import base64
from sysinv.common import constants
from sysinv.common import exception
@ -68,7 +68,7 @@ def docker_registry_authenticate(www_authenticate):
# make a request to the token server
# the credentials are passed as a header while the rest
# are passed as params
auth_string = base64.b64encode("%s:%s" % (REGISTRY_USERNAME, get_registry_password()))
auth_string = base64.encode_as_text("%s:%s" % (REGISTRY_USERNAME, get_registry_password()))
token_server_request_headers = {"authorization": "Basic %s" % auth_string}
# we need try twice.
# SYSTEM_CERT_PATH if the docker registry cert is signed by a trusted CA

View File

@ -9,7 +9,6 @@
""" System Inventory Kubernetes Application Operator."""
import base64
import copy
import docker
from eventlet.green import subprocess
@ -38,6 +37,7 @@ from eventlet import Timeout
from fm_api import constants as fm_constants
from fm_api import fm_api
from oslo_log import log as logging
from oslo_serialization import base64
from sysinv._i18n import _
from sysinv.api.controllers.v1 import kube_app
from sysinv.common import constants
@ -983,12 +983,12 @@ class AppOperator(object):
secret = self._kube.kube_get_secret("registry-local-secret", kubernetes.NAMESPACE_KUBE_SYSTEM)
if secret is None:
return
secret_auth_body = base64.b64decode(secret.data['.dockerconfigjson'])
secret_auth_body = base64.decode_as_text(secret.data['.dockerconfigjson'])
secret_auth_info = (secret_auth_body.split('auth":')[1]).split('"')[1]
registry_auth = cutils.get_local_docker_registry_auth()
registry_auth_info = '{0}:{1}'.format(registry_auth['username'],
registry_auth['password'])
if secret_auth_info == base64.b64encode(registry_auth_info):
if secret_auth_info == base64.encode_as_text(registry_auth_info):
LOG.debug("Auth info is the same, no update is needed for k8s secret.")
return
except Exception as e:
@ -997,8 +997,8 @@ class AppOperator(object):
try:
# update secret with new auth info
token = '{{\"auths\": {{\"{0}\": {{\"auth\": \"{1}\"}}}}}}'.format(
constants.DOCKER_REGISTRY_SERVER, base64.b64encode(registry_auth_info))
secret.data['.dockerconfigjson'] = base64.b64encode(token)
constants.DOCKER_REGISTRY_SERVER, base64.encode_as_text(registry_auth_info))
secret.data['.dockerconfigjson'] = base64.encode_as_text(token)
self._kube.kube_patch_secret("registry-local-secret", kubernetes.NAMESPACE_KUBE_SYSTEM, secret)
LOG.info("Secret registry-local-secret under Namespace kube-system is updated")
except Exception as e:
@ -1015,9 +1015,9 @@ class AppOperator(object):
continue
try:
secret_auth_body = base64.b64decode(secret.data['.dockerconfigjson'])
secret_auth_body = base64.decode_as_text(secret.data['.dockerconfigjson'])
if constants.DOCKER_REGISTRY_SERVER in secret_auth_body:
secret.data['.dockerconfigjson'] = base64.b64encode(token)
secret.data['.dockerconfigjson'] = base64.encode_as_text(token)
self._kube.kube_patch_secret(AppOperator.DOCKER_REGISTRY_SECRET, ns, secret)
LOG.info("Secret %s under Namespace %s is updated"
% (AppOperator.DOCKER_REGISTRY_SECRET, ns))

View File

@ -29,7 +29,6 @@ collection of inventory data for each host.
"""
import base64
import errno
import filecmp
import fnmatch
@ -70,6 +69,7 @@ from netaddr import IPAddress
from netaddr import IPNetwork
from oslo_config import cfg
from oslo_log import log
from oslo_serialization import base64
from oslo_serialization import jsonutils
from oslo_utils import excutils
from oslo_utils import timeutils
@ -12410,8 +12410,8 @@ class ConductorManager(service.PeriodicService):
sc_endpoint_cert_secret_ns, sc_intermediate_ca_secret_name
))
tls_key = base64.b64encode(sc_ca_key)
tls_crt = base64.b64encode(sc_ca_cert)
tls_key = base64.encode_as_text(sc_ca_key)
tls_crt = base64.encode_as_text(sc_ca_cert)
if tls_key == secret.data['tls.key'] and tls_crt == secret.data['tls.crt']:
LOG.info('Intermediate CA cert is not changed')
return
@ -14358,7 +14358,7 @@ class ConductorManager(service.PeriodicService):
return dict(success="", error=msg)
data = secret.data
tls_crt = base64.b64decode(data['tls.crt'])
tls_crt = base64.decode_as_bytes(data['tls.crt'])
cert = cutils.extract_certs_from_pem(tls_crt)[0]
# extract information regarding the new rootca

View File

@ -6,6 +6,7 @@
import keyring
import os
from oslo_serialization import base64
from sysinv.common import constants
from sysinv.common import exception
from sysinv.common import utils
@ -522,7 +523,7 @@ class PlatformPuppet(base.BasePuppet):
# convert these back to their native encoding
encoded_files = tpm_data.pop("base64_encoded_files", [])
for binary in encoded_files:
tpm_data[binary] = tpm_data[binary].decode('base64')
tpm_data[binary] = base64.decode_as_text(tpm_data[binary])
config.update({
'platform::tpm::tpm_data': tpm_data
})

View File

@ -22,7 +22,6 @@
"""Test class for Sysinv ManagerService."""
import base64
import mock
import os.path
import tsconfig.tsconfig as tsc
@ -31,6 +30,7 @@ import uuid
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from oslo_serialization import base64
from sysinv.agent import rpcapi as agent_rpcapi
from sysinv.common import constants
from sysinv.common import device as dconstants
@ -48,7 +48,7 @@ from sysinv.tests.db import utils
class FakeSecret(object):
def __init__(self, crt):
self.data = {'tls.crt': base64.b64encode(crt)}
self.data = {'tls.crt': base64.encode_as_text(crt)}
class FakeCephOperator(object):