# Copyright 2020 Red Hat, inc # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import base64 import math import os import re import subprocess import requests def get_default(config, section, option, default=None, expand_user=False): if config.has_option(section, option): # Need to be ensured that we get suitable # type from config file by default value if isinstance(default, bool): value = config.getboolean(section, option) elif isinstance(default, int): value = config.getint(section, option) else: value = config.get(section, option) else: value = default if expand_user and value: return os.path.expanduser(value) return value def encrypt_with_openssl(pubkey_path, plaintext, logger=None): cmd = ['openssl', 'version'] if logger: logger.debug('Invoking "%s"' % ' '.join(cmd)) try: openssl_version = subprocess.check_output( cmd).split()[1] except FileNotFoundError: raise Exception('"openssl" is not installed on the system') cmd = ['openssl', 'rsa', '-text', '-pubin', '-in', pubkey_path] if logger: logger.debug('Invoking "%s"' % ' '.join(cmd)) p = subprocess.Popen(cmd, stdout=subprocess.PIPE) (stdout, stderr) = p.communicate() if p.returncode != 0: raise Exception('openssl failure (Return code %s)' % p.returncode) output = stdout.decode('utf-8') if openssl_version.startswith(b'0.'): key_length_re = r'^Modulus \((?P\d+) bit\):$' else: key_length_re = r'^(|RSA )Public-Key: \((?P\d+) bit\)$' m = re.match(key_length_re, output, re.MULTILINE) nbits = int(m.group('key_length')) nbytes = int(nbits / 8) max_bytes = nbytes - 42 # PKCS1-OAEP overhead chunks = int(math.ceil(float(len(plaintext)) / max_bytes)) ciphertext_chunks = [] if logger: logger.info( 'Public key length: {} bits ({} bytes)'.format(nbits, nbytes)) logger.info( 'Max plaintext length per chunk: {} bytes'.format(max_bytes)) logger.info( 'Input plaintext length: {} bytes'.format(len(plaintext))) logger.info('Number of chunks: {}'.format(chunks)) # NOTE(ianw) 2023-03-29 : previously this used the deprecated # rsautl tool, which hardcoded sha1 as the oaep hash; so zuul # assumes that on decryption. Be careful modifying it. cmd = ['openssl', 'pkeyutl', '-encrypt', '-pubin', '-inkey', pubkey_path, '-pkeyopt', 'rsa_padding_mode:oaep', '-pkeyopt', 'rsa_oaep_md:sha1'] if logger: logger.debug('Invoking "%s" with each data chunk:' % ' '.join(cmd)) for count in range(chunks): chunk = plaintext[int(count * max_bytes): int((count + 1) * max_bytes)] p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE) if logger: logger.debug('\tchunk %s' % (count + 1)) (stdout, stderr) = p.communicate(str.encode(chunk)) if p.returncode != 0: raise Exception('openssl failure (Return code %s)' % p.returncode) ciphertext_chunks.append(base64.b64encode(stdout).decode('utf-8')) return ciphertext_chunks def get_oidc_config(authority, verify=True): _authority = authority if not _authority.endswith('/'): _authority += ('/') oidc_config = requests.get( _authority + '.well-known/openid-configuration', verify=verify ) oidc_config.raise_for_status() return oidc_config.json() def is_direct_grant_allowed(oidc_config): return 'password' in oidc_config.get('grant_types_supported', []) def get_token(username, password, client_id, oidc_config, scope=None, verify=True): token_endpoint = oidc_config.get('token_endpoint') _data = { 'client_id': client_id, 'username': username, 'password': password, 'grant_type': 'password', } if scope: _data['scope'] = scope response = requests.post( token_endpoint, verify=verify, data=_data ) response.raise_for_status() token = response.json()['access_token'] return token