Port crypto to Python 3

Fix bytes vs Unicode issues:

* ssh_encrypt_text() now encodes text to UTF-8 if text type is Unicode.
* Encode Unicode to ASCII before encoding it to base64
* On Python 3, decode binascii.hexlify() and base64.encodestring()
  output from ASCII to get Unicode (str type in Python 3) is required.
* convert_from_sshrsa_to_pkcs8(): reuse binascii.hexlify() instead of
  a loop using struck.unpack() and "%02X" format.
* Replace str.encode('hex') with binascii.hexlify(str)
* Replace string.strip(text) with text.strip()
* Replace StringIO.StringIO with six.StringIO
* Add test on the output type of ssh_encrypt_text() and decrypt_text()
* Call utils.execute() with binary=True to not decode stdout/stderr
  (to get them as raw bytes).
* Replace reduce() with six.moves.reduce()
* convert_version_to_str(): replace a/b with a//b to get integer
  division
* tox.ini: add the following tests to Python 3.4

  - nova.tests.unit.compute.test_keypairs
  - nova.tests.unit.test_crypto

Blueprint nova-python3
Change-Id: I83d927166c0864020b205ac7495473795da7830d
This commit is contained in:
Victor Stinner 2015-07-01 17:48:54 +02:00
parent 22c3e7437a
commit b2c55421d6
4 changed files with 66 additions and 30 deletions

View File

@ -26,7 +26,6 @@ import base64
import binascii
import os
import re
import string
import struct
from oslo_concurrency import processutils
@ -133,7 +132,7 @@ def generate_fingerprint(public_key):
try:
parts = public_key.split(' ')
ssh_alg = parts[0]
pub_data = parts[1].decode('base64')
pub_data = base64.b64decode(parts[1])
if ssh_alg == 'ssh-rsa':
pkey = paramiko.RSAKey(data=pub_data)
elif ssh_alg == 'ssh-dss':
@ -144,8 +143,10 @@ def generate_fingerprint(public_key):
raise exception.InvalidKeypair(
reason=_('Unknown ssh key type %s') % ssh_alg)
raw_fp = binascii.hexlify(pkey.get_fingerprint())
if six.PY3:
raw_fp = raw_fp.decode('ascii')
return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2]))
except (IndexError, UnicodeDecodeError, binascii.Error,
except (TypeError, IndexError, UnicodeDecodeError, binascii.Error,
paramiko.ssh_exception.SSHException):
raise exception.InvalidKeypair(
reason=_('failed to generate fingerprint'))
@ -153,10 +154,12 @@ def generate_fingerprint(public_key):
def generate_x509_fingerprint(pem_key):
try:
if isinstance(pem_key, six.text_type):
pem_key = pem_key.encode('utf-8')
(out, _err) = utils.execute('openssl', 'x509', '-inform', 'PEM',
'-fingerprint', '-noout',
process_input=pem_key)
fingerprint = string.strip(out.rpartition('=')[2])
fingerprint = out.rpartition('=')[2].strip()
return fingerprint.lower()
except processutils.ProcessExecutionError as ex:
raise exception.InvalidKeypair(
@ -166,7 +169,7 @@ def generate_x509_fingerprint(pem_key):
def generate_key_pair(bits=2048):
key = paramiko.RSAKey.generate(bits)
keyout = six.BytesIO()
keyout = six.StringIO()
key.write_private_key(keyout)
private_key = keyout.getvalue()
public_key = '%s %s Generated-by-Nova' % (key.get_name(), key.get_base64())
@ -194,7 +197,8 @@ def decrypt_text(project_id, text):
'rsautl',
'-decrypt',
'-inkey', '%s' % private_key,
process_input=text)
process_input=text,
binary=True)
return dec
except processutils.ProcessExecutionError as exc:
raise exception.DecryptionFailure(reason=exc.stderr)
@ -241,20 +245,20 @@ def convert_from_sshrsa_to_pkcs8(pubkey):
# +- INTEGER 65537
# Build the sequence for the bit string
n_val = int(
''.join(['%02X' % struct.unpack('B', x)[0] for x in parts[2]]), 16)
e_val = int(
''.join(['%02X' % struct.unpack('B', x)[0] for x in parts[1]]), 16)
n_val = int(binascii.hexlify(parts[2]), 16)
e_val = int(binascii.hexlify(parts[1]), 16)
pkinfo = _to_sequence(univ.Integer(n_val), univ.Integer(e_val))
# Convert the sequence into a bit string
pklong = long(der_encoder.encode(pkinfo).encode('hex'), 16)
pklong = int(binascii.hexlify(der_encoder.encode(pkinfo)), 16)
pkbitstring = univ.BitString("'00%s'B" % bin(pklong)[2:])
# Build the key data structure
oid = _to_sequence(_RSA_OID, univ.Null())
pkcs1_seq = _to_sequence(oid, pkbitstring)
pkcs8 = base64.encodestring(der_encoder.encode(pkcs1_seq))
pkcs8 = base64.b64encode(der_encoder.encode(pkcs1_seq))
if six.PY3:
pkcs8 = pkcs8.decode('ascii')
# Remove the embedded new line and format the key, each line
# should be 64 characters long
@ -264,7 +268,11 @@ def convert_from_sshrsa_to_pkcs8(pubkey):
def ssh_encrypt_text(ssh_public_key, text):
"""Encrypt text with an ssh public key.
If text is a Unicode string, encode it to UTF-8.
"""
if isinstance(text, six.text_type):
text = text.encode('utf-8')
with utils.tempdir() as tmpdir:
sslkey = os.path.abspath(os.path.join(tmpdir, 'ssl.key'))
try:
@ -277,7 +285,8 @@ def ssh_encrypt_text(ssh_public_key, text):
'-pubin',
'-inkey', sslkey,
'-keyform', 'PEM',
process_input=text)
process_input=text,
binary=True)
return enc
except processutils.ProcessExecutionError as exc:
raise exception.EncryptionFailure(reason=exc.stderr)
@ -375,14 +384,19 @@ def generate_winrm_x509_cert(user_id, bits=2048):
'openssl', 'req', '-x509', '-nodes', '-days', '3650',
'-config', conffile, '-newkey', 'rsa:%s' % bits,
'-outform', 'PEM', '-keyout', keyfile, '-subj', subject,
'-extensions', 'v3_req_client')
'-extensions', 'v3_req_client',
binary=True)
(out, _err) = utils.execute('openssl', 'pkcs12', '-export',
'-inkey', keyfile, '-password', 'pass:',
process_input=certificate)
process_input=certificate,
binary=True)
private_key = out.encode('base64')
private_key = base64.b64encode(out)
fingerprint = generate_x509_fingerprint(certificate)
if six.PY3:
private_key = private_key.decode('ascii')
certificate = certificate.decode('utf-8')
return (private_key, certificate, fingerprint)
@ -459,7 +473,7 @@ def _sign_csr(csr_text, ca_folder):
'./openssl.cnf', '-infiles', inbound)
out, _err = utils.execute('openssl', 'x509', '-in', outbound,
'-serial', '-noout')
serial = string.strip(out.rpartition('=')[2])
serial = out.rpartition('=')[2].strip()
os.chdir(start)
with open(outbound, 'r') as crtfile:

View File

@ -16,13 +16,14 @@
Tests for Crypto module.
"""
import base64
import os
import StringIO
import mock
from mox3 import mox
from oslo_concurrency import processutils
import paramiko
import six
from nova import crypto
from nova import db
@ -57,18 +58,26 @@ class X509Test(test.TestCase):
self.flags(ca_path=tmpdir)
project_id = "fake"
crypto.ensure_ca_filesystem()
cert = crypto.fetch_ca(project_id)
public_key = os.path.join(tmpdir, "public.pem")
with open(public_key, 'w') as keyfile:
keyfile.write(cert)
text = "some @#!%^* test text"
process_input = text.encode("ascii") if six.PY3 else text
enc, _err = utils.execute('openssl',
'rsautl',
'-certin',
'-encrypt',
'-inkey', '%s' % public_key,
process_input=text)
process_input=process_input,
binary=True)
dec = crypto.decrypt_text(project_id, enc)
self.assertIsInstance(dec, bytes)
if six.PY3:
dec = dec.decode('ascii')
self.assertEqual(text, dec)
@mock.patch.object(utils, 'execute',
@ -224,15 +233,23 @@ e6fCXWECgYEAqgpGvva5kJ1ISgNwnJbwiNw0sOT9BMOsdNZBElf0kJIIy6FMPvap
'rsautl',
'-decrypt',
'-inkey', sshkey,
process_input=text)
process_input=text,
binary=True)
return dec
except processutils.ProcessExecutionError as exc:
raise exception.DecryptionFailure(reason=exc.stderr)
def test_ssh_encrypt_decrypt_text(self):
enc = crypto.ssh_encrypt_text(self.pubkey, self.text)
self.assertNotEqual(enc, self.text)
self.assertIsInstance(enc, bytes)
# Comparison between bytes and str raises a TypeError
# when using python3 -bb
if six.PY2:
self.assertNotEqual(enc, self.text)
result = self._ssh_decrypt_text(self.prikey, enc)
self.assertIsInstance(result, bytes)
if six.PY3:
result = result.decode('utf-8')
self.assertEqual(result, self.text)
def test_ssh_encrypt_failure(self):
@ -346,19 +363,22 @@ class KeyPairTest(test.TestCase):
def test_generate_key_pair_2048_bits(self):
(private_key, public_key, fingerprint) = crypto.generate_key_pair()
raw_pub = public_key.split(' ')[1].decode('base64')
raw_pub = public_key.split(' ')[1]
if six.PY3:
raw_pub = raw_pub.encode('ascii')
raw_pub = base64.b64decode(raw_pub)
pkey = paramiko.rsakey.RSAKey(None, raw_pub)
self.assertEqual(2048, pkey.get_bits())
def test_generate_key_pair_1024_bits(self):
bits = 1024
(private_key, public_key, fingerprint) = crypto.generate_key_pair(bits)
raw_pub = public_key.split(' ')[1].decode('base64')
raw_pub = base64.b64decode(public_key.split(' ')[1])
pkey = paramiko.rsakey.RSAKey(None, raw_pub)
self.assertEqual(bits, pkey.get_bits())
def test_generate_key_pair_mocked_private_key(self):
keyin = StringIO.StringIO()
keyin = six.StringIO()
keyin.write(self.rsa_prv)
keyin.seek(0)
key = paramiko.RSAKey.from_private_key(keyin)

View File

@ -1048,7 +1048,7 @@ def convert_version_to_int(version):
if isinstance(version, six.string_types):
version = convert_version_to_tuple(version)
if isinstance(version, tuple):
return reduce(lambda x, y: (x * 1000) + y, version)
return six.moves.reduce(lambda x, y: (x * 1000) + y, version)
except Exception:
msg = _("Hypervisor version %s is invalid.") % version
raise exception.NovaException(msg)
@ -1060,9 +1060,9 @@ def convert_version_to_str(version_int):
while version_int != 0:
version_number = version_int - (version_int // factor * factor)
version_numbers.insert(0, str(version_number))
version_int = version_int / factor
version_int = version_int // factor
return reduce(lambda x, y: "%s.%s" % (x, y), version_numbers)
return six.moves.reduce(lambda x, y: "%s.%s" % (x, y), version_numbers)
def convert_version_to_tuple(version_str):

View File

@ -36,9 +36,8 @@ deps = -r{toxinidir}/requirements.txt
commands =
find . -type f -name "*.pyc" -delete
python -m testtools.run \
nova.tests.unit.test_exception \
nova.tests.unit.compute.test_keypairs \
nova.tests.unit.db.test_db_api \
nova.tests.unit.test_versions \
nova.tests.unit.objects.test_agent \
nova.tests.unit.objects.test_aggregate \
nova.tests.unit.objects.test_bandwidth_usage \
@ -76,7 +75,10 @@ commands =
nova.tests.unit.objects.test_tag \
nova.tests.unit.objects.test_vcpu_model \
nova.tests.unit.objects.test_virt_cpu_topology \
nova.tests.unit.objects.test_virtual_interface
nova.tests.unit.objects.test_virtual_interface \
nova.tests.unit.test_crypto \
nova.tests.unit.test_exception \
nova.tests.unit.test_versions
[testenv:functional]
usedevelop = True