From 2981bce61d66e5028f12ecbf1a3367d5c3b637d7 Mon Sep 17 00:00:00 2001 From: "OTSUKA, Yuanying" Date: Tue, 21 Jun 2016 10:55:23 +0900 Subject: [PATCH] Make 'signing csr' accept Unicode CA Private key `x509.operations.sign` method will raise error if text type of CA Private key is given. This patch fixes it. Change-Id: I6803612cc7f2ca8c9910f9c4886dba541c913ec0 --- magnum/common/x509/operations.py | 26 ++++++++++++++----- .../tests/unit/common/x509/test_operations.py | 5 ++-- magnum/tests/unit/common/x509/test_sign.py | 24 +++++++++++++++++ 3 files changed, 45 insertions(+), 10 deletions(-) diff --git a/magnum/common/x509/operations.py b/magnum/common/x509/operations.py index 07fd7e6ca9..4ed1fb35ba 100644 --- a/magnum/common/x509/operations.py +++ b/magnum/common/x509/operations.py @@ -151,6 +151,22 @@ def _generate_certificate(issuer_name, subject_name, extensions, ca_key=None, return keypairs +def _load_pem_private_key(ca_key, ca_key_password=None): + if not isinstance(ca_key, rsa.RSAPrivateKey): + if isinstance(ca_key, six.text_type): + ca_key = six.b(str(ca_key)) + if isinstance(ca_key_password, six.text_type): + ca_key_password = six.b(str(ca_key_password)) + + ca_key = serialization.load_pem_private_key( + ca_key, + password=ca_key_password, + backend=default_backend() + ) + + return ca_key + + def sign(csr, issuer_name, ca_key, ca_key_password=None, skip_validation=False): """Sign a given csr @@ -163,10 +179,7 @@ def sign(csr, issuer_name, ca_key, ca_key_password=None, :returns: generated certificate """ - if not isinstance(ca_key, rsa.RSAPrivateKey): - ca_key = serialization.load_pem_private_key(ca_key, - password=ca_key_password, - backend=default_backend()) + ca_key = _load_pem_private_key(ca_key, ca_key_password) if not isinstance(issuer_name, six.text_type): issuer_name = six.text_type(issuer_name.decode('utf-8')) @@ -213,9 +226,8 @@ def sign(csr, issuer_name, ca_key, ca_key_password=None, def decrypt_key(encrypted_key, password): - private_key = serialization.load_pem_private_key( - encrypted_key, password=password, backend=default_backend() - ) + private_key = _load_pem_private_key(encrypted_key, password) + decrypted_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, diff --git a/magnum/tests/unit/common/x509/test_operations.py b/magnum/tests/unit/common/x509/test_operations.py index aa6f43bdfe..b6e1ce0e77 100644 --- a/magnum/tests/unit/common/x509/test_operations.py +++ b/magnum/tests/unit/common/x509/test_operations.py @@ -25,7 +25,7 @@ class TestX509Operations(base.BaseTestCase): @mock.patch.object(serialization, 'NoEncryption') @mock.patch.object(operations, 'default_backend') - @mock.patch.object(serialization, 'load_pem_private_key') + @mock.patch.object(operations, '_load_pem_private_key') def test_decrypt_key(self, mock_load_pem_private_key, mock_default_backend, mock_no_encryption_class): mock_private_key = mock.MagicMock() @@ -36,8 +36,7 @@ class TestX509Operations(base.BaseTestCase): mock.sentinel.passphrase) mock_load_pem_private_key.assert_called_once_with( - mock.sentinel.key, password=mock.sentinel.passphrase, - backend=mock_default_backend.return_value) + mock.sentinel.key, mock.sentinel.passphrase) mock_private_key.private_bytes.assert_called_once_with( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, diff --git a/magnum/tests/unit/common/x509/test_sign.py b/magnum/tests/unit/common/x509/test_sign.py index 459e236257..7771c90b7f 100644 --- a/magnum/tests/unit/common/x509/test_sign.py +++ b/magnum/tests/unit/common/x509/test_sign.py @@ -72,6 +72,13 @@ class TestX509(base.BaseTestCase): serialization.PublicFormat.SubjectPublicKeyInfo ) + def _private_bytes(self, private_key): + return private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + ) + def _generate_private_key(self): return rsa.generate_private_key( public_exponent=65537, @@ -183,6 +190,23 @@ class TestX509(base.BaseTestCase): self.assertInClientExtensions(cert) + def test_load_pem_private_key_with_bytes_private_key(self): + private_key = self._generate_private_key() + private_key = self._private_bytes(private_key) + + self.assertIsInstance(private_key, six.binary_type) + private_key = operations._load_pem_private_key(private_key) + self.assertIsInstance(private_key, rsa.RSAPrivateKey) + + def test_load_pem_private_key_with_unicode_private_key(self): + private_key = self._generate_private_key() + private_key = self._private_bytes(private_key) + private_key = six.text_type(private_key.decode('utf-8')) + + self.assertIsInstance(private_key, six.text_type) + private_key = operations._load_pem_private_key(private_key) + self.assertIsInstance(private_key, rsa.RSAPrivateKey) + @mock.patch('cryptography.x509.load_pem_x509_csr') @mock.patch('six.b') def test_sign_with_unicode_csr(self, mock_six, mock_load_pem):