crypto: Remove unused functions
These top-level functions were not called from anywhere but tests. - fetch_ca - ensure_ca_filesystem - fetch_crl - decrypt_text - revoke_certs_by_user - revoke_certs_by_project - revoke_certs_by_user_and_project - generate_x509_cert - generate_vpn_files These other functions are used by the above and are no longer used anywhere. - ca_folder - ca_path - key_path - crl_path - revoke_cert - _project_cert_subject - _user_cert_subject - _ensure_project_folder - sign_csr - _sign_csr Tests for these are removed as are a number of scripts found in 'nova/CA', which were only used by the aforementioned functions. Change-Id: Ie1dadc6bf935f777e0cd0c54a0a21b79545714c5
This commit is contained in:
parent
130777ab0e
commit
ddb2b028f1
208
nova/crypto.py
208
nova/crypto.py
@ -17,7 +17,6 @@
|
||||
"""Wrappers around standard crypto data elements.
|
||||
|
||||
Includes root and intermediate CAs, SSH key_pairs and x509 certificates.
|
||||
|
||||
"""
|
||||
|
||||
from __future__ import absolute_import
|
||||
@ -26,24 +25,18 @@ import base64
|
||||
import binascii
|
||||
import os
|
||||
|
||||
from cryptography import exceptions
|
||||
from cryptography.hazmat import backends
|
||||
from cryptography.hazmat.primitives.asymmetric import padding
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography import x509
|
||||
from oslo_concurrency import processutils
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
from oslo_utils import fileutils
|
||||
import paramiko
|
||||
import six
|
||||
|
||||
import nova.conf
|
||||
from nova import context
|
||||
from nova import db
|
||||
from nova import exception
|
||||
from nova.i18n import _, _LE
|
||||
from nova.i18n import _
|
||||
from nova import utils
|
||||
|
||||
|
||||
@ -52,45 +45,6 @@ LOG = logging.getLogger(__name__)
|
||||
CONF = nova.conf.CONF
|
||||
|
||||
|
||||
def ca_folder(project_id=None):
|
||||
if CONF.crypto.use_project_ca and project_id:
|
||||
return os.path.join(CONF.crypto.ca_path, 'projects', project_id)
|
||||
return CONF.crypto.ca_path
|
||||
|
||||
|
||||
def ca_path(project_id=None):
|
||||
return os.path.join(ca_folder(project_id), CONF.crypto.ca_file)
|
||||
|
||||
|
||||
def key_path(project_id=None):
|
||||
return os.path.join(ca_folder(project_id), CONF.crypto.key_file)
|
||||
|
||||
|
||||
def crl_path(project_id=None):
|
||||
return os.path.join(ca_folder(project_id), CONF.crypto.crl_file)
|
||||
|
||||
|
||||
def fetch_ca(project_id=None):
|
||||
if not CONF.crypto.use_project_ca:
|
||||
project_id = None
|
||||
ca_file_path = ca_path(project_id)
|
||||
if not os.path.exists(ca_file_path):
|
||||
raise exception.CryptoCAFileNotFound(project=project_id)
|
||||
with open(ca_file_path, 'r') as cafile:
|
||||
return cafile.read()
|
||||
|
||||
|
||||
def ensure_ca_filesystem():
|
||||
"""Ensure the CA filesystem exists."""
|
||||
ca_dir = ca_folder()
|
||||
if not os.path.exists(ca_path()):
|
||||
genrootca_sh_path = os.path.abspath(
|
||||
os.path.join(os.path.dirname(__file__), 'CA', 'genrootca.sh'))
|
||||
|
||||
fileutils.ensure_tree(ca_dir)
|
||||
utils.execute("sh", genrootca_sh_path, cwd=ca_dir)
|
||||
|
||||
|
||||
def generate_fingerprint(public_key):
|
||||
try:
|
||||
pub_bytes = public_key.encode('utf-8')
|
||||
@ -138,31 +92,6 @@ def generate_key_pair(bits=2048):
|
||||
return (private_key, public_key, fingerprint)
|
||||
|
||||
|
||||
def fetch_crl(project_id):
|
||||
"""Get crl file for project."""
|
||||
if not CONF.crypto.use_project_ca:
|
||||
project_id = None
|
||||
crl_file_path = crl_path(project_id)
|
||||
if not os.path.exists(crl_file_path):
|
||||
raise exception.CryptoCRLFileNotFound(project=project_id)
|
||||
with open(crl_file_path, 'r') as crlfile:
|
||||
return crlfile.read()
|
||||
|
||||
|
||||
def decrypt_text(project_id, text):
|
||||
private_key_file = key_path(project_id)
|
||||
if not os.path.exists(private_key_file):
|
||||
raise exception.ProjectNotFound(project_id=project_id)
|
||||
with open(private_key_file, 'rb') as f:
|
||||
data = f.read()
|
||||
try:
|
||||
priv_key = serialization.load_pem_private_key(
|
||||
data, None, backends.default_backend())
|
||||
return priv_key.decrypt(text, padding.PKCS1v15())
|
||||
except (ValueError, TypeError, exceptions.UnsupportedAlgorithm) as exc:
|
||||
raise exception.DecryptionFailure(reason=six.text_type(exc))
|
||||
|
||||
|
||||
def ssh_encrypt_text(ssh_public_key, text):
|
||||
"""Encrypt text with an ssh public key.
|
||||
|
||||
@ -179,79 +108,6 @@ def ssh_encrypt_text(ssh_public_key, text):
|
||||
raise exception.EncryptionFailure(reason=six.text_type(exc))
|
||||
|
||||
|
||||
def revoke_cert(project_id, file_name):
|
||||
"""Revoke a cert by file name."""
|
||||
try:
|
||||
# NOTE(vish): potential race condition here
|
||||
utils.execute('openssl', 'ca', '-config', './openssl.cnf', '-revoke',
|
||||
file_name, cwd=ca_folder(project_id))
|
||||
utils.execute('openssl', 'ca', '-gencrl', '-config', './openssl.cnf',
|
||||
'-out', CONF.crypto.crl_file, cwd=ca_folder(project_id))
|
||||
except OSError:
|
||||
raise exception.ProjectNotFound(project_id=project_id)
|
||||
except processutils.ProcessExecutionError:
|
||||
raise exception.RevokeCertFailure(project_id=project_id)
|
||||
|
||||
|
||||
def revoke_certs_by_user(user_id):
|
||||
"""Revoke all user certs."""
|
||||
admin = context.get_admin_context()
|
||||
for cert in db.certificate_get_all_by_user(admin, user_id):
|
||||
revoke_cert(cert['project_id'], cert['file_name'])
|
||||
|
||||
|
||||
def revoke_certs_by_project(project_id):
|
||||
"""Revoke all project certs."""
|
||||
# NOTE(vish): This is somewhat useless because we can just shut down
|
||||
# the vpn.
|
||||
admin = context.get_admin_context()
|
||||
for cert in db.certificate_get_all_by_project(admin, project_id):
|
||||
revoke_cert(cert['project_id'], cert['file_name'])
|
||||
|
||||
|
||||
def revoke_certs_by_user_and_project(user_id, project_id):
|
||||
"""Revoke certs for user in project."""
|
||||
admin = context.get_admin_context()
|
||||
for cert in db.certificate_get_all_by_user_and_project(admin,
|
||||
user_id, project_id):
|
||||
revoke_cert(cert['project_id'], cert['file_name'])
|
||||
|
||||
|
||||
def _project_cert_subject(project_id):
|
||||
"""Helper to generate user cert subject."""
|
||||
return CONF.crypto.project_cert_subject % (project_id, utils.isotime())
|
||||
|
||||
|
||||
def _user_cert_subject(user_id, project_id):
|
||||
"""Helper to generate user cert subject."""
|
||||
return CONF.crypto.user_cert_subject % (project_id, user_id,
|
||||
utils.isotime())
|
||||
|
||||
|
||||
def generate_x509_cert(user_id, project_id, bits=2048):
|
||||
"""Generate and sign a cert for user in project."""
|
||||
subject = _user_cert_subject(user_id, project_id)
|
||||
|
||||
with utils.tempdir() as tmpdir:
|
||||
keyfile = os.path.abspath(os.path.join(tmpdir, 'temp.key'))
|
||||
csrfile = os.path.abspath(os.path.join(tmpdir, 'temp.csr'))
|
||||
utils.execute('openssl', 'genrsa', '-out', keyfile, str(bits))
|
||||
utils.execute('openssl', 'req', '-new', '-key', keyfile, '-out',
|
||||
csrfile, '-batch', '-subj', subject)
|
||||
with open(keyfile) as f:
|
||||
private_key = f.read()
|
||||
with open(csrfile) as f:
|
||||
csr = f.read()
|
||||
|
||||
(serial, signed_csr) = sign_csr(csr, project_id)
|
||||
fname = os.path.join(ca_folder(project_id), 'newcerts/%s.pem' % serial)
|
||||
cert = {'user_id': user_id,
|
||||
'project_id': project_id,
|
||||
'file_name': fname}
|
||||
db.certificate_create(context.get_admin_context(), cert)
|
||||
return (private_key, signed_csr)
|
||||
|
||||
|
||||
def generate_winrm_x509_cert(user_id, bits=2048):
|
||||
"""Generate a cert for passwordless auth for user in project."""
|
||||
subject = '/CN=%s' % user_id
|
||||
@ -293,65 +149,3 @@ def _create_x509_openssl_config(conffile, upn):
|
||||
|
||||
with open(conffile, 'w') as file:
|
||||
file.write(content % upn)
|
||||
|
||||
|
||||
def _ensure_project_folder(project_id):
|
||||
if not os.path.exists(ca_path(project_id)):
|
||||
geninter_sh_path = os.path.abspath(
|
||||
os.path.join(os.path.dirname(__file__), 'CA', 'geninter.sh'))
|
||||
utils.execute('sh', geninter_sh_path, project_id,
|
||||
_project_cert_subject(project_id), cwd=ca_folder())
|
||||
|
||||
|
||||
def generate_vpn_files(project_id):
|
||||
project_folder = ca_folder(project_id)
|
||||
key_fn = os.path.join(project_folder, 'server.key')
|
||||
crt_fn = os.path.join(project_folder, 'server.crt')
|
||||
|
||||
if os.path.exists(crt_fn):
|
||||
return
|
||||
# NOTE(vish): The 2048 is to maintain compatibility with the old script.
|
||||
# We are using "project-vpn" as the user_id for the cert
|
||||
# even though that user may not really exist. Ultimately
|
||||
# this will be changed to be launched by a real user. At
|
||||
# that point we will can delete this helper method.
|
||||
key, csr = generate_x509_cert('project-vpn', project_id, 2048)
|
||||
with open(key_fn, 'w') as keyfile:
|
||||
keyfile.write(key)
|
||||
with open(crt_fn, 'w') as crtfile:
|
||||
crtfile.write(csr)
|
||||
|
||||
|
||||
def sign_csr(csr_text, project_id=None):
|
||||
if not CONF.crypto.use_project_ca:
|
||||
project_id = None
|
||||
if not project_id:
|
||||
return _sign_csr(csr_text, ca_folder())
|
||||
_ensure_project_folder(project_id)
|
||||
return _sign_csr(csr_text, ca_folder(project_id))
|
||||
|
||||
|
||||
def _sign_csr(csr_text, ca_folder):
|
||||
with utils.tempdir() as tmpdir:
|
||||
inbound = os.path.join(tmpdir, 'inbound.csr')
|
||||
outbound = os.path.join(tmpdir, 'outbound.csr')
|
||||
|
||||
try:
|
||||
with open(inbound, 'w') as csrfile:
|
||||
csrfile.write(csr_text)
|
||||
except IOError:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.exception(_LE('Failed to write inbound.csr'))
|
||||
|
||||
LOG.debug('Flags path: %s', ca_folder)
|
||||
|
||||
# Change working dir to CA
|
||||
fileutils.ensure_tree(ca_folder)
|
||||
utils.execute('openssl', 'ca', '-batch', '-out', outbound, '-config',
|
||||
'./openssl.cnf', '-infiles', inbound, cwd=ca_folder)
|
||||
out, _err = utils.execute('openssl', 'x509', '-in', outbound,
|
||||
'-serial', '-noout', cwd=ca_folder)
|
||||
serial = out.rpartition('=')[2].strip()
|
||||
|
||||
with open(outbound, 'r') as crtfile:
|
||||
return (serial, crtfile.read())
|
||||
|
@ -14,101 +14,6 @@
|
||||
# under the License.
|
||||
|
||||
|
||||
def ensure_ca_filesystem():
|
||||
pass
|
||||
|
||||
|
||||
def fetch_ca(project_id=None):
|
||||
rootca = """-----BEGIN CERTIFICATE-----
|
||||
MIICyzCCAjSgAwIBAgIJAIJ/UoFWKoOUMA0GCSqGSIb3DQEBBAUAME4xEjAQBgNV
|
||||
BAoTCU5PVkEgUk9PVDEWMBQGA1UEBxMNTW91bnRhaW4gVmlldzETMBEGA1UECBMK
|
||||
Q2FsaWZvcm5pYTELMAkGA1UEBhMCVVMwHhcNMTIxMDAyMTg1NzQ1WhcNMTMxMDAy
|
||||
MTg1NzQ1WjBOMRIwEAYDVQQKEwlOT1ZBIFJPT1QxFjAUBgNVBAcTDU1vdW50YWlu
|
||||
IFZpZXcxEzARBgNVBAgTCkNhbGlmb3JuaWExCzAJBgNVBAYTAlVTMIGfMA0GCSqG
|
||||
SIb3DQEBAQUAA4GNADCBiQKBgQCg0Bn8WSqbJF3QNTZUxo1TzmFBxuqvhjZLKbnQ
|
||||
IiShdVIWUK7RC8frq8FJI7dgJNmvkIBn9njABWDoZmurQRCzD65yCSbUc4R2ea5H
|
||||
IK4wQIui0CJykvMBNjAe3bzztVVs8/ccDTsjtqq3F/KeQkKzQVfSWBrJSmYtG5tO
|
||||
G+dOSwIDAQABo4GwMIGtMAwGA1UdEwQFMAMBAf8wHQYDVR0OBBYEFCljRfaNOsA/
|
||||
9mHuq0io7Lt83FtaMH4GA1UdIwR3MHWAFCljRfaNOsA/9mHuq0io7Lt83FtaoVKk
|
||||
UDBOMRIwEAYDVQQKEwlOT1ZBIFJPT1QxFjAUBgNVBAcTDU1vdW50YWluIFZpZXcx
|
||||
EzARBgNVBAgTCkNhbGlmb3JuaWExCzAJBgNVBAYTAlVTggkAgn9SgVYqg5QwDQYJ
|
||||
KoZIhvcNAQEEBQADgYEAEbpJOOlpKCh5omwfAwAfFg1ml4h/FJiCH3PETmOCc+3l
|
||||
CtWTBd4MG8AoH7A3PU2JKAGVQ5XWo6+ihpW1RgfQpCnloI6vIeGcws+rSLnlzULt
|
||||
IvfCJpRg7iQdR3jZGt3295behtP1GsCqipJEulOkOaEIs8iLlXgSOG94Mkwlb4Q=
|
||||
-----END CERTIFICATE-----
|
||||
"""
|
||||
return rootca
|
||||
|
||||
|
||||
def generate_x509_cert(user_id, project_id, bits=1024):
|
||||
pk = """-----BEGIN RSA PRIVATE KEY-----
|
||||
MIICXAIBAAKBgQC4h2d63ijt9l0fIBRY37D3Yj2FYajCMUlftSoHNA4lEw0uTXnH
|
||||
Jjbd0j7HNlSADWeAMuaoSDNp7CIsXMt6iA/ASN5nFFTZlLRqIzYoI0RHiiSJjvSG
|
||||
d1n4Yrar1eC8tK3Rld1Zo6rj6tOuIxfFVJajJVZykCAHjGNNvulgfhBXFwIDAQAB
|
||||
AoGBAIjfxx4YU/vO1lwUC4OwyS92q3OYcPk6XdakJryZHDTb4NcLmNzjt6bqIK7b
|
||||
2enyB2fMWdNRWvGiueZ2HmiRLDyOGsAVdEsHvL4qbr9EZGTqC8Qxx+zTevWWf6pB
|
||||
F1zxzbXNQDFZDf9kVsSLCkbMHITnW1k4MrM++9gfCO3WrfehAkEA4nd8TyCCZazq
|
||||
KMOQwFLTNaiVLeTXCtvGopl4ZNiKYZ1qI3KDXb2wbAyArFuERlotxFlylXpwtlMo
|
||||
SlI/C/sYqwJBANCX1sdfRJq8DpdP44ThWqOkWFLB9rBiwyyBt8746fX8amwr8eyz
|
||||
H44/z5GT/Vyp8qFsjkuDzeP93eeDnr2qE0UCP1zipRnPO6x4P5J4o+Y+EmLvwkAQ
|
||||
nCLYAaCvUbILHrbq2Z2wWjEYnEO03RHUd2xjkGH4TgcBMTmW4e+ZzEIduwJACnIw
|
||||
LVfWBbG5QVac3EC021EVoz9XbUnk4Eu2usS4Yrs7USN6QBJQWD1V1cKFg6h3ICJh
|
||||
leKJ4wsJm9h5kKH9yQJBAN8CaX223MlTSuBOVuIOwNA+09iLfx4UCLiH1fGMKDpe
|
||||
xVcmkM3qCnTqNxrAPSFdT9IyB3IXiaLWbvzl7MfiOwQ=
|
||||
-----END RSA PRIVATE KEY-----
|
||||
"""
|
||||
csr = """Certificate:
|
||||
Data:
|
||||
Version: 1 (0x0)
|
||||
Serial Number: 23 (0x17)
|
||||
Signature Algorithm: md5WithRSAEncryption
|
||||
Issuer: O=NOVA ROOT, L=Mountain View, ST=California, C=US
|
||||
Validity
|
||||
Not Before: Oct 2 19:31:45 2012 GMT
|
||||
Not After : Oct 2 19:31:45 2013 GMT
|
||||
Subject: C=US, ST=California, O=OpenStack, OU=NovaDev, """
|
||||
"""CN=openstack-fake-2012-10-02T19:31:45Z
|
||||
Subject Public Key Info:
|
||||
Public Key Algorithm: rsaEncryption
|
||||
RSA Public Key: (1024 bit)
|
||||
Modulus (1024 bit):
|
||||
00:b8:87:67:7a:de:28:ed:f6:5d:1f:20:14:58:df:
|
||||
b0:f7:62:3d:85:61:a8:c2:31:49:5f:b5:2a:07:34:
|
||||
0e:25:13:0d:2e:4d:79:c7:26:36:dd:d2:3e:c7:36:
|
||||
54:80:0d:67:80:32:e6:a8:48:33:69:ec:22:2c:5c:
|
||||
cb:7a:88:0f:c0:48:de:67:14:54:d9:94:b4:6a:23:
|
||||
36:28:23:44:47:8a:24:89:8e:f4:86:77:59:f8:62:
|
||||
b6:ab:d5:e0:bc:b4:ad:d1:95:dd:59:a3:aa:e3:ea:
|
||||
d3:ae:23:17:c5:54:96:a3:25:56:72:90:20:07:8c:
|
||||
63:4d:be:e9:60:7e:10:57:17
|
||||
Exponent: 65537 (0x10001)
|
||||
Signature Algorithm: md5WithRSAEncryption
|
||||
32:82:ff:8b:92:0e:8d:9c:6b:ce:7e:fe:34:16:2a:4c:47:4f:
|
||||
c7:28:a2:33:1e:48:56:2e:4b:e8:e8:e3:48:b1:3d:a3:43:21:
|
||||
ef:83:e7:df:e2:10:91:7e:9a:c0:4d:1e:96:68:2b:b9:f7:84:
|
||||
7f:ec:84:8a:bf:bc:5e:50:05:d9:ce:4a:1a:bf:d2:bf:0c:d1:
|
||||
7e:ec:64:c3:a5:37:78:a3:a6:2b:a1:b7:1c:cc:c8:b9:78:61:
|
||||
98:50:3c:e6:28:34:f1:0e:62:bb:b5:d7:a1:dd:1f:38:c6:0d:
|
||||
58:9f:81:67:ff:9c:32:fc:52:7e:6d:8c:91:43:49:fe:e3:48:
|
||||
bb:40
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIICMzCCAZwCARcwDQYJKoZIhvcNAQEEBQAwTjESMBAGA1UEChMJTk9WQSBST09U
|
||||
MRYwFAYDVQQHEw1Nb3VudGFpbiBWaWV3MRMwEQYDVQQIEwpDYWxpZm9ybmlhMQsw
|
||||
CQYDVQQGEwJVUzAeFw0xMjEwMDIxOTMxNDVaFw0xMzEwMDIxOTMxNDVaMHYxCzAJ
|
||||
BgNVBAYTAlVTMRMwEQYDVQQIEwpDYWxpZm9ybmlhMRIwEAYDVQQKEwlPcGVuU3Rh
|
||||
Y2sxEDAOBgNVBAsTB05vdmFEZXYxLDAqBgNVBAMTI29wZW5zdGFjay1mYWtlLTIw
|
||||
MTItMTAtMDJUMTk6MzE6NDVaMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC4
|
||||
h2d63ijt9l0fIBRY37D3Yj2FYajCMUlftSoHNA4lEw0uTXnHJjbd0j7HNlSADWeA
|
||||
MuaoSDNp7CIsXMt6iA/ASN5nFFTZlLRqIzYoI0RHiiSJjvSGd1n4Yrar1eC8tK3R
|
||||
ld1Zo6rj6tOuIxfFVJajJVZykCAHjGNNvulgfhBXFwIDAQABMA0GCSqGSIb3DQEB
|
||||
BAUAA4GBADKC/4uSDo2ca85+/jQWKkxHT8coojMeSFYuS+jo40ixPaNDIe+D59/i
|
||||
EJF+msBNHpZoK7n3hH/shIq/vF5QBdnOShq/0r8M0X7sZMOlN3ijpiuhtxzMyLl4
|
||||
YZhQPOYoNPEOYru116HdHzjGDVifgWf/nDL8Un5tjJFDSf7jSLtA
|
||||
-----END CERTIFICATE-----
|
||||
"""
|
||||
return pk, csr
|
||||
|
||||
|
||||
def get_x509_cert_and_fingerprint():
|
||||
fingerprint = "a1:6f:6d:ea:a6:36:d0:3a:c6:eb:b6:ee:07:94:3e:2a:90:98:2b:c9"
|
||||
certif = (
|
||||
|
@ -28,157 +28,9 @@ import six
|
||||
from nova import crypto
|
||||
from nova import exception
|
||||
from nova import test
|
||||
from nova.tests import uuidsentinel as uuids
|
||||
from nova import utils
|
||||
|
||||
|
||||
class X509Test(test.NoDBTestCase):
|
||||
@mock.patch('nova.db.certificate_create')
|
||||
def test_can_generate_x509(self, mock_create):
|
||||
with utils.tempdir() as tmpdir:
|
||||
self.flags(ca_path=tmpdir, group='crypto')
|
||||
crypto.ensure_ca_filesystem()
|
||||
_key, cert_str = crypto.generate_x509_cert('fake', 'fake')
|
||||
|
||||
project_cert = crypto.fetch_ca(project_id='fake')
|
||||
|
||||
signed_cert_file = os.path.join(tmpdir, "signed")
|
||||
with open(signed_cert_file, 'w') as keyfile:
|
||||
keyfile.write(cert_str)
|
||||
|
||||
project_cert_file = os.path.join(tmpdir, "project")
|
||||
with open(project_cert_file, 'w') as keyfile:
|
||||
keyfile.write(project_cert)
|
||||
|
||||
enc, err = utils.execute('openssl', 'verify', '-CAfile',
|
||||
project_cert_file, '-verbose', signed_cert_file)
|
||||
self.assertFalse(err)
|
||||
|
||||
def test_encrypt_decrypt_x509(self):
|
||||
with utils.tempdir() as tmpdir:
|
||||
self.flags(ca_path=tmpdir, group='crypto')
|
||||
project_id = "fake"
|
||||
crypto.ensure_ca_filesystem()
|
||||
|
||||
cert = crypto.fetch_ca(project_id)
|
||||
public_key = os.path.join(tmpdir, "public.pem")
|
||||
with open(public_key, 'w') as keyfile:
|
||||
keyfile.write(cert)
|
||||
|
||||
text = "some @#!%^* test text"
|
||||
process_input = text.encode("ascii") if six.PY3 else text
|
||||
enc, _err = utils.execute('openssl',
|
||||
'rsautl',
|
||||
'-certin',
|
||||
'-encrypt',
|
||||
'-inkey', '%s' % public_key,
|
||||
process_input=process_input,
|
||||
binary=True)
|
||||
|
||||
dec = crypto.decrypt_text(project_id, enc)
|
||||
self.assertIsInstance(dec, bytes)
|
||||
if six.PY3:
|
||||
dec = dec.decode('ascii')
|
||||
self.assertEqual(text, dec)
|
||||
|
||||
@mock.patch.object(utils, 'execute',
|
||||
side_effect=processutils.ProcessExecutionError)
|
||||
def test_ensure_ca_filesystem_chdir(self, *args, **kargs):
|
||||
with utils.tempdir() as tmpdir:
|
||||
self.flags(ca_path=tmpdir, group='crypto')
|
||||
start = os.getcwd()
|
||||
self.assertRaises(processutils.ProcessExecutionError,
|
||||
crypto.ensure_ca_filesystem)
|
||||
self.assertEqual(start, os.getcwd())
|
||||
|
||||
|
||||
class RevokeCertsTest(test.NoDBTestCase):
|
||||
|
||||
@mock.patch('nova.crypto.revoke_cert')
|
||||
def test_revoke_certs_by_user_and_project(self, mock_revoke):
|
||||
user_id = 'test_user'
|
||||
project_id = 2
|
||||
file_name = 'test_file'
|
||||
|
||||
def mock_certificate_get_all_by_user_and_project(context,
|
||||
user_id,
|
||||
project_id):
|
||||
|
||||
return [{"user_id": user_id, "project_id": project_id,
|
||||
"file_name": file_name}]
|
||||
|
||||
self.stub_out('nova.db.certificate_get_all_by_user_and_project',
|
||||
mock_certificate_get_all_by_user_and_project)
|
||||
|
||||
crypto.revoke_certs_by_user_and_project(user_id, project_id)
|
||||
|
||||
mock_revoke.assert_called_once_with(project_id, file_name)
|
||||
|
||||
@mock.patch('nova.crypto.revoke_cert')
|
||||
def test_revoke_certs_by_user(self, mock_revoke):
|
||||
user_id = 'test_user'
|
||||
project_id = 2
|
||||
file_name = 'test_file'
|
||||
|
||||
def mock_certificate_get_all_by_user(context, user_id):
|
||||
|
||||
return [{"user_id": user_id, "project_id": project_id,
|
||||
"file_name": file_name}]
|
||||
|
||||
self.stub_out('nova.db.certificate_get_all_by_user',
|
||||
mock_certificate_get_all_by_user)
|
||||
|
||||
crypto.revoke_certs_by_user(user_id)
|
||||
mock_revoke.assert_called_once_with(project_id, mock.ANY)
|
||||
|
||||
@mock.patch('nova.crypto.revoke_cert')
|
||||
def test_revoke_certs_by_project(self, mock_revoke):
|
||||
user_id = 'test_user'
|
||||
project_id = 2
|
||||
file_name = 'test_file'
|
||||
|
||||
def mock_certificate_get_all_by_project(context, project_id):
|
||||
|
||||
return [{"user_id": user_id, "project_id": project_id,
|
||||
"file_name": file_name}]
|
||||
|
||||
self.stub_out('nova.db.certificate_get_all_by_project',
|
||||
mock_certificate_get_all_by_project)
|
||||
|
||||
crypto.revoke_certs_by_project(project_id)
|
||||
mock_revoke.assert_called_once_with(project_id, mock.ANY)
|
||||
|
||||
@mock.patch.object(utils, 'execute',
|
||||
side_effect=processutils.ProcessExecutionError)
|
||||
@mock.patch.object(os, 'chdir', return_value=None)
|
||||
def test_revoke_cert_process_execution_error(self, *args, **kargs):
|
||||
self.assertRaises(exception.RevokeCertFailure, crypto.revoke_cert,
|
||||
2, 'test_file')
|
||||
|
||||
def test_revoke_cert_project_not_found_chdir_fails(self, *args, **kargs):
|
||||
self.flags(use_project_ca=True, group='crypto')
|
||||
self.assertRaises(exception.ProjectNotFound, crypto.revoke_cert,
|
||||
uuids.fake, 'test_file')
|
||||
|
||||
|
||||
class CertExceptionTests(test.NoDBTestCase):
|
||||
def test_fetch_ca_file_not_found(self):
|
||||
with utils.tempdir() as tmpdir:
|
||||
self.flags(ca_path=tmpdir, group='crypto')
|
||||
self.flags(use_project_ca=True, group='crypto')
|
||||
|
||||
self.assertRaises(exception.CryptoCAFileNotFound, crypto.fetch_ca,
|
||||
project_id='fake')
|
||||
|
||||
def test_fetch_crl_file_not_found(self):
|
||||
with utils.tempdir() as tmpdir:
|
||||
self.flags(ca_path=tmpdir, group='crypto')
|
||||
self.flags(use_project_ca=True, group='crypto')
|
||||
|
||||
self.assertRaises(exception.CryptoCRLFileNotFound,
|
||||
crypto.fetch_crl, project_id='fake')
|
||||
|
||||
|
||||
class EncryptionTests(test.NoDBTestCase):
|
||||
pubkey = ("ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDArtgrfBu/g2o28o+H2ng/crv"
|
||||
"zgES91i/NNPPFTOutXelrJ9QiPTPTm+B8yspLsXifmbsmXztNOlBQgQXs6usxb4"
|
||||
|
Loading…
Reference in New Issue
Block a user