1a11790f2d
There are some small warts (mostly around other execute() like methods which reuse the exception). I will fix these warts in later reviews. Change-Id: Ice9cdbdc5f3e5a9f8365f5d99acf1863a9fe3e7a
244 lines
9.9 KiB
Python
244 lines
9.9 KiB
Python
# Copyright 2011 OpenStack Foundation
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
"""
|
|
Tests for Crypto module.
|
|
"""
|
|
|
|
import os
|
|
|
|
import mox
|
|
|
|
from nova import crypto
|
|
from nova import db
|
|
from nova import exception
|
|
from nova.openstack.common import processutils
|
|
from nova import test
|
|
from nova import utils
|
|
|
|
|
|
class X509Test(test.TestCase):
|
|
def test_can_generate_x509(self):
|
|
with utils.tempdir() as tmpdir:
|
|
self.flags(ca_path=tmpdir)
|
|
crypto.ensure_ca_filesystem()
|
|
_key, cert_str = crypto.generate_x509_cert('fake', 'fake')
|
|
|
|
project_cert = crypto.fetch_ca(project_id='fake')
|
|
|
|
signed_cert_file = os.path.join(tmpdir, "signed")
|
|
with open(signed_cert_file, 'w') as keyfile:
|
|
keyfile.write(cert_str)
|
|
|
|
project_cert_file = os.path.join(tmpdir, "project")
|
|
with open(project_cert_file, 'w') as keyfile:
|
|
keyfile.write(project_cert)
|
|
|
|
enc, err = utils.execute('openssl', 'verify', '-CAfile',
|
|
project_cert_file, '-verbose', signed_cert_file)
|
|
self.assertFalse(err)
|
|
|
|
def test_encrypt_decrypt_x509(self):
|
|
with utils.tempdir() as tmpdir:
|
|
self.flags(ca_path=tmpdir)
|
|
project_id = "fake"
|
|
crypto.ensure_ca_filesystem()
|
|
cert = crypto.fetch_ca(project_id)
|
|
public_key = os.path.join(tmpdir, "public.pem")
|
|
with open(public_key, 'w') as keyfile:
|
|
keyfile.write(cert)
|
|
text = "some @#!%^* test text"
|
|
enc, _err = utils.execute('openssl',
|
|
'rsautl',
|
|
'-certin',
|
|
'-encrypt',
|
|
'-inkey', '%s' % public_key,
|
|
process_input=text)
|
|
dec = crypto.decrypt_text(project_id, enc)
|
|
self.assertEqual(text, dec)
|
|
|
|
|
|
class RevokeCertsTest(test.TestCase):
|
|
|
|
def test_revoke_certs_by_user_and_project(self):
|
|
user_id = 'test_user'
|
|
project_id = 2
|
|
file_name = 'test_file'
|
|
|
|
def mock_certificate_get_all_by_user_and_project(context,
|
|
user_id,
|
|
project_id):
|
|
|
|
return [{"user_id": user_id, "project_id": project_id,
|
|
"file_name": file_name}]
|
|
|
|
self.stubs.Set(db, 'certificate_get_all_by_user_and_project',
|
|
mock_certificate_get_all_by_user_and_project)
|
|
|
|
self.mox.StubOutWithMock(crypto, 'revoke_cert')
|
|
crypto.revoke_cert(project_id, file_name)
|
|
|
|
self.mox.ReplayAll()
|
|
|
|
crypto.revoke_certs_by_user_and_project(user_id, project_id)
|
|
|
|
def test_revoke_certs_by_user(self):
|
|
user_id = 'test_user'
|
|
project_id = 2
|
|
file_name = 'test_file'
|
|
|
|
def mock_certificate_get_all_by_user(context, user_id):
|
|
|
|
return [{"user_id": user_id, "project_id": project_id,
|
|
"file_name": file_name}]
|
|
|
|
self.stubs.Set(db, 'certificate_get_all_by_user',
|
|
mock_certificate_get_all_by_user)
|
|
|
|
self.mox.StubOutWithMock(crypto, 'revoke_cert')
|
|
crypto.revoke_cert(project_id, mox.IgnoreArg())
|
|
|
|
self.mox.ReplayAll()
|
|
|
|
crypto.revoke_certs_by_user(user_id)
|
|
|
|
def test_revoke_certs_by_project(self):
|
|
user_id = 'test_user'
|
|
project_id = 2
|
|
file_name = 'test_file'
|
|
|
|
def mock_certificate_get_all_by_project(context, project_id):
|
|
|
|
return [{"user_id": user_id, "project_id": project_id,
|
|
"file_name": file_name}]
|
|
|
|
self.stubs.Set(db, 'certificate_get_all_by_project',
|
|
mock_certificate_get_all_by_project)
|
|
|
|
self.mox.StubOutWithMock(crypto, 'revoke_cert')
|
|
crypto.revoke_cert(project_id, mox.IgnoreArg())
|
|
|
|
self.mox.ReplayAll()
|
|
|
|
crypto.revoke_certs_by_project(project_id)
|
|
|
|
|
|
class CertExceptionTests(test.TestCase):
|
|
def test_fetch_ca_file_not_found(self):
|
|
with utils.tempdir() as tmpdir:
|
|
self.flags(ca_path=tmpdir)
|
|
self.flags(use_project_ca=True)
|
|
|
|
self.assertRaises(exception.CryptoCAFileNotFound, crypto.fetch_ca,
|
|
project_id='fake')
|
|
|
|
def test_fetch_crl_file_not_found(self):
|
|
with utils.tempdir() as tmpdir:
|
|
self.flags(ca_path=tmpdir)
|
|
self.flags(use_project_ca=True)
|
|
|
|
self.assertRaises(exception.CryptoCRLFileNotFound,
|
|
crypto.fetch_crl, project_id='fake')
|
|
|
|
|
|
class EncryptionTests(test.TestCase):
|
|
pubkey = ("ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDArtgrfBu/g2o28o+H2ng/crv"
|
|
"zgES91i/NNPPFTOutXelrJ9QiPTPTm+B8yspLsXifmbsmXztNOlBQgQXs6usxb4"
|
|
"fnJKNUZ84Vkp5esbqK/L7eyRqwPvqo7btKBMoAMVX/kUyojMpxb7Ssh6M6Y8cpi"
|
|
"goi+MSDPD7+5yRJ9z4mH9h7MCY6Ejv8KTcNYmVHvRhsFUcVhWcIISlNWUGiG7rf"
|
|
"oki060F5myQN3AXcL8gHG5/Qb1RVkQFUKZ5geQ39/wSyYA1Q65QTba/5G2QNbl2"
|
|
"0eAIBTyKZhN6g88ak+yARa6BLLDkrlP7L4WctHQMLsuXHohQsUO9AcOlVMARgrg"
|
|
"uF test@test")
|
|
prikey = """-----BEGIN RSA PRIVATE KEY-----
|
|
MIIEpQIBAAKCAQEAwK7YK3wbv4NqNvKPh9p4P3K784BEvdYvzTTzxUzrrV3payfU
|
|
Ij0z05vgfMrKS7F4n5m7Jl87TTpQUIEF7OrrMW+H5ySjVGfOFZKeXrG6ivy+3ska
|
|
sD76qO27SgTKADFV/5FMqIzKcW+0rIejOmPHKYoKIvjEgzw+/uckSfc+Jh/YezAm
|
|
OhI7/Ck3DWJlR70YbBVHFYVnCCEpTVlBohu636JItOtBeZskDdwF3C/IBxuf0G9U
|
|
VZEBVCmeYHkN/f8EsmANUOuUE22v+RtkDW5dtHgCAU8imYTeoPPGpPsgEWugSyw5
|
|
K5T+y+FnLR0DC7Llx6IULFDvQHDpVTAEYK4LhQIDAQABAoIBAF9ibrrgHnBpItx+
|
|
qVUMbriiGK8LUXxUmqdQTljeolDZi6KzPc2RVKWtpazBSvG7skX3+XCediHd+0JP
|
|
DNri1HlNiA6B0aUIGjoNsf6YpwsE4YwyK9cR5k5YGX4j7se3pKX2jOdngxQyw1Mh
|
|
dkmCeWZz4l67nbSFz32qeQlwrsB56THJjgHB7elDoGCXTX/9VJyjFlCbfxVCsIng
|
|
inrNgT0uMSYMNpAjTNOjguJt/DtXpwzei5eVpsERe0TRRVH23ycS0fuq/ancYwI/
|
|
MDr9KSB8r+OVGeVGj3popCxECxYLBxhqS1dAQyJjhQXKwajJdHFzidjXO09hLBBz
|
|
FiutpYUCgYEA6OFikTrPlCMGMJjSj+R9woDAOPfvCDbVZWfNo8iupiECvei88W28
|
|
RYFnvUQRjSC0pHe//mfUSmiEaE+SjkNCdnNR+vsq9q+htfrADm84jl1mfeWatg/g
|
|
zuGz2hAcZnux3kQMI7ufOwZNNpM2bf5B4yKamvG8tZRRxSkkAL1NV48CgYEA08/Z
|
|
Ty9g9XPKoLnUWStDh1zwG+c0q14l2giegxzaUAG5DOgOXbXcw0VQ++uOWD5ARELG
|
|
g9wZcbBsXxJrRpUqx+GAlv2Y1bkgiPQS1JIyhsWEUtwfAC/G+uZhCX53aI3Pbsjh
|
|
QmkPCSp5DuOuW2PybMaw+wVe+CaI/gwAWMYDAasCgYEA4Fzkvc7PVoU33XIeywr0
|
|
LoQkrb4QyPUrOvt7H6SkvuFm5thn0KJMlRpLfAksb69m2l2U1+HooZd4mZawN+eN
|
|
DNmlzgxWJDypq83dYwq8jkxmBj1DhMxfZnIE+L403nelseIVYAfPLOqxUTcbZXVk
|
|
vRQFp+nmSXqQHUe5rAy1ivkCgYEAqLu7cclchCxqDv/6mc5NTVhMLu5QlvO5U6fq
|
|
HqitgW7d69oxF5X499YQXZ+ZFdMBf19ypTiBTIAu1M3nh6LtIa4SsjXzus5vjKpj
|
|
FdQhTBus/hU83Pkymk1MoDOPDEtsI+UDDdSDldmv9pyKGWPVi7H86vusXCLWnwsQ
|
|
e6fCXWECgYEAqgpGvva5kJ1ISgNwnJbwiNw0sOT9BMOsdNZBElf0kJIIy6FMPvap
|
|
6S1ziw+XWfdQ83VIUOCL5DrwmcYzLIogS0agmnx/monfDx0Nl9+OZRxy6+AI9vkK
|
|
86A1+DXdo+IgX3grFK1l1gPhAZPRWJZ+anrEkyR4iLq6ZoPZ3BQn97U=
|
|
-----END RSA PRIVATE KEY-----"""
|
|
text = "Some text! %$*"
|
|
|
|
def _ssh_decrypt_text(self, ssh_private_key, text):
|
|
with utils.tempdir() as tmpdir:
|
|
sshkey = os.path.abspath(os.path.join(tmpdir, 'ssh.key'))
|
|
with open(sshkey, 'w') as f:
|
|
f.write(ssh_private_key)
|
|
try:
|
|
dec, _err = utils.execute('openssl',
|
|
'rsautl',
|
|
'-decrypt',
|
|
'-inkey', sshkey,
|
|
process_input=text)
|
|
return dec
|
|
except processutils.ProcessExecutionError as exc:
|
|
raise exception.DecryptionFailure(reason=exc.stderr)
|
|
|
|
def test_ssh_encrypt_decrypt_text(self):
|
|
enc = crypto.ssh_encrypt_text(self.pubkey, self.text)
|
|
self.assertNotEqual(enc, self.text)
|
|
result = self._ssh_decrypt_text(self.prikey, enc)
|
|
self.assertEqual(result, self.text)
|
|
|
|
def test_ssh_encrypt_failure(self):
|
|
self.assertRaises(exception.EncryptionFailure,
|
|
crypto.ssh_encrypt_text, '', self.text)
|
|
|
|
|
|
class ConversionTests(test.TestCase):
|
|
k1 = ("ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA4CqmrxfU7x4sJrubpMNxeglul+d"
|
|
"ByrsicnvQcHDEjPzdvoz+BaoAG9bjCA5mCeTBIISsVTVXz/hxNeiuBV6LH/UR/c"
|
|
"27yl53ypN+821ImoexQZcKItdnjJ3gVZlDob1f9+1qDVy63NJ1c+TstkrCTRVeo"
|
|
"9VyE7RpdSS4UCiBe8Xwk3RkedioFxePrI0Ktc2uASw2G0G2Rl7RN7KZOJbCivfF"
|
|
"LQMAOu6e+7fYvuE1gxGHHj7dxaBY/ioGOm1W4JmQ1V7AKt19zTBlZKduN8FQMSF"
|
|
"r35CDlvoWs0+OP8nwlebKNCi/5sdL8qiSLrAcPB4LqdkAf/blNSVA2Yl83/c4lQ"
|
|
"== test@test")
|
|
|
|
k2 = ("-----BEGIN PUBLIC KEY-----\n"
|
|
"MIIBIDANBgkqhkiG9w0BAQEFAAOCAQ0AMIIBCAKCAQEA4CqmrxfU7x4sJrubpMNx\n"
|
|
"eglul+dByrsicnvQcHDEjPzdvoz+BaoAG9bjCA5mCeTBIISsVTVXz/hxNeiuBV6L\n"
|
|
"H/UR/c27yl53ypN+821ImoexQZcKItdnjJ3gVZlDob1f9+1qDVy63NJ1c+TstkrC\n"
|
|
"TRVeo9VyE7RpdSS4UCiBe8Xwk3RkedioFxePrI0Ktc2uASw2G0G2Rl7RN7KZOJbC\n"
|
|
"ivfFLQMAOu6e+7fYvuE1gxGHHj7dxaBY/ioGOm1W4JmQ1V7AKt19zTBlZKduN8FQ\n"
|
|
"MSFr35CDlvoWs0+OP8nwlebKNCi/5sdL8qiSLrAcPB4LqdkAf/blNSVA2Yl83/c4\n"
|
|
"lQIBIw==\n"
|
|
"-----END PUBLIC KEY-----\n")
|
|
|
|
def test_convert_keys(self):
|
|
result = crypto.convert_from_sshrsa_to_pkcs8(self.k1)
|
|
self.assertEqual(result, self.k2)
|
|
|
|
def test_convert_failure(self):
|
|
self.assertRaises(exception.EncryptionFailure,
|
|
crypto.convert_from_sshrsa_to_pkcs8, '')
|