diff --git a/functionaltests/api/v1/behaviors/secret_behaviors.py b/functionaltests/api/v1/behaviors/secret_behaviors.py index 5cbe21dc6..1c0b3323b 100644 --- a/functionaltests/api/v1/behaviors/secret_behaviors.py +++ b/functionaltests/api/v1/behaviors/secret_behaviors.py @@ -37,14 +37,21 @@ class SecretBehaviors(base_behaviors.BaseBehaviors): return resp, secret_ref def update_secret_payload(self, secret_ref, payload, payload_content_type, - payload_content_encoding): + payload_content_encoding=None): """Updates a secret's payload data. - :param secret_ref: HATEOS ref of the secret to be deleted - :return: A request response object + :param secret_ref: HATEOS ref of the secret to be updated + :param payload: new payload to be sent to server + :param payload_content_type: value for the Content-Type header + :param payload_content_encoding: value for the Content-Encoding header + :return: the response from the PUT update """ - headers = {'Content-Type': payload_content_type, - 'Content-Encoding': payload_content_encoding} + + if payload_content_encoding is None: + headers = {'Content-Type': payload_content_type} + else: + headers = {'Content-Type': payload_content_type, + 'Content-Encoding': payload_content_encoding} return self.client.put(secret_ref, data=payload, extra_headers=headers) diff --git a/functionaltests/api/v1/smoke/test_rsa.py b/functionaltests/api/v1/smoke/test_rsa.py new file mode 100644 index 000000000..a4884ed6f --- /dev/null +++ b/functionaltests/api/v1/smoke/test_rsa.py @@ -0,0 +1,844 @@ +# Copyright (c) 2015 Cisco Systems +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import base64 +import time + +from Crypto.PublicKey import RSA +from OpenSSL import crypto +from testtools import testcase + +from barbican.tests import utils +from functionaltests.api import base +from functionaltests.api.v1.behaviors import container_behaviors +from functionaltests.api.v1.behaviors import order_behaviors +from functionaltests.api.v1.behaviors import secret_behaviors +from functionaltests.api.v1.models import container_models +from functionaltests.api.v1.models import order_models +from functionaltests.api.v1.models import secret_models +from functionaltests.common import keys + + +def get_private_key_req(bits, payload): + return {'name': 'myprivatekey', + 'payload_content_type': 'application/octet-stream', + 'payload_content_encoding': 'base64', + 'algorithm': 'rsa', + 'bit_length': bits, + 'secret_type': 'private', + 'payload': payload} + + +def get_public_key_req(bits, payload): + return {'name': 'mypublickey', + 'payload_content_type': 'application/octet-stream', + 'payload_content_encoding': 'base64', + 'algorithm': 'rsa', + 'bit_length': bits, + 'secret_type': 'public', + 'payload': payload} + + +def get_passphrase_req(passphrase): + return {'name': 'mypassphrase', + 'payload_content_type': 'text/plain', + 'secret_type': 'passphrase', + 'payload': passphrase} + + +def get_container_req(public_key_ref, private_key_ref, passphrase=None): + if passphrase is None: + return {"name": "rsacontainer", + "type": "rsa", + "secret_refs": [ + {'name': 'public_key', 'secret_ref': public_key_ref}, + {'name': 'private_key', 'secret_ref': private_key_ref}]} + else: + return {"name": "rsacontainer", + "type": "rsa", + "secret_refs": [ + {'name': 'public_key', 'secret_ref': public_key_ref}, + {'name': 'private_key', 'secret_ref': private_key_ref}, + {'name': 'private_key_passphrase', + 'secret_ref': passphrase}]} + + +def get_certificate_req(payload): + return {'name': 'mycertificate', + 'payload_content_type': 'application/octet-stream', + 'payload_content_encoding': 'base64', + 'secret_type': 'certificate', + 'payload': payload} + + +def get_order_create_rsa_container(): + return {'type': 'asymmetric', + "meta": {"name": "ordered rsacontainer", + "algorithm": "rsa", + "bit_length": 1024, + "mode": "cbc", + "payload_content_type": "application/octet-stream"}} + + +def get_order_create_rsa_container_with_passphrase(): + return {'type': 'asymmetric', + "meta": {"name": "ordered rsacontainer", + "algorithm": "rsa", + "bit_length": 1024, + "passphrase": "password", + "mode": "cbc", + "payload_content_type": "application/octet-stream"}} + + +def get_order_create_certificate(container_ref): + return {'type': 'certificate', + 'meta': {'request_type': 'stored-key', + 'container_ref': container_ref, + 'subject_dn': 'cn=server.example.com,o=example.com', + 'requestor_name': 'Barbican User', + 'requestor_email': 'user@example.com', + 'requestor_phone': '555-1212'}} + + +def get_order_create_certificate_simple_cmc(csr): + return {'type': 'certificate', + 'meta': {'request_type': 'simple-cmc', + 'requestor_name': 'Barbican User', + 'requestor_email': 'user@example.com', + 'requestor_phone': '555-1212', + 'request_data': csr}} + + +@utils.parameterized_test_case +class RSATestCase(base.TestCase): + """Positive test cases for all ways of working with RSA keys + + These tests are meant to be 'real'. All input is created + using OpenSSL commands and all results verified by OpenSSL. + """ + + def setUp(self): + super(RSATestCase, self).setUp() + self.secret_behaviors = secret_behaviors.SecretBehaviors(self.client) + self.container_behaviors =\ + container_behaviors.ContainerBehaviors(self.client) + self.order_behaviors =\ + order_behaviors.OrderBehaviors(self.client) + + def tearDown(self): + self.secret_behaviors.delete_all_created_secrets() + self.container_behaviors.delete_all_created_containers() + self.order_behaviors.delete_all_created_orders() + super(RSATestCase, self).tearDown() + + def wait_for_order(self, order_resp, order_ref): + # Make sure we have an active order + time_count = 1 + while order_resp.model.status != "ACTIVE" and time_count <= 4: + time.sleep(1) + time_count += 1 + order_resp = self.order_behaviors.get_order(order_ref) + + @testcase.attr('positive') + def test_rsa_check_input_keys(self): + """Verify the keys input for test cases""" + + # prove pyOpenSSL can parse the original private key + pem = keys.get_private_key_pkcs8() + crypto.load_privatekey(crypto.FILETYPE_PEM, pem) + + # prove pyCrypto can parse the original public key + pem = keys.get_public_key_pem() + RSA.importKey(pem) + + # prove pyOpenSSL can parse the original encrypted private key + pem = keys.get_encrypted_private_key_pkcs8() + passphrase = keys.get_passphrase_txt() + crypto.load_privatekey(crypto.FILETYPE_PEM, + pem, + passphrase) + + # prove OpenSSL can parse the original certificate + pem = keys.get_certificate_pem() + crypto.load_certificate(crypto.FILETYPE_PEM, pem) + + @testcase.skip("Broken pending fix for 1441866") + @testcase.skip("POST comes back with Problem decoding payload") + @testcase.attr('positive') + def test_rsa_create_and_get_private_key(self): + """Create a private key secret with one Post, then Get""" + + # make a secret + bits = 2048 + pem = keys.get_private_key_pkcs8() + + # create with Post to server + test_model = secret_models.SecretModel( + **get_private_key_req(bits, base64.b64encode(pem))) + resp, secret_ref = self.secret_behaviors.create_secret(test_model) + self.assertEqual(201, resp.status_code) + + # retrieve with Get to server + content_type = 'application/octet-stream' + get_resp = self.secret_behaviors.get_secret(secret_ref, content_type) + self.assertEqual(200, get_resp.status_code) + + # check that returned key is same as original key + self.assertEqual(pem, get_resp.content) + + @testcase.skip("Broken pending fix for 1441866") + @testcase.skip("POST comes back with Problem decoding payload") + @testcase.attr('positive') + def test_rsa_create_and_get_public_key(self): + """Create a public key secret with one Post, then Get""" + + # make a secret + bits = 2048 + pem = keys.get_public_key_pem() + + # create with Post to server + test_model = secret_models.SecretModel( + **get_public_key_req(bits, base64.b64encode(pem))) + resp, secret_ref = self.secret_behaviors.create_secret(test_model) + self.assertEqual(201, resp.status_code) + + # retrieve with Get to server + content_type = 'application/octet-stream' + get_resp = self.secret_behaviors.get_secret(secret_ref, content_type) + self.assertEqual(200, get_resp.status_code) + + # check that returned pem is same as original pem + self.assertEqual(pem, get_resp.content) + + @testcase.attr('positive') + def test_rsa_two_step_create_and_get_public_key(self): + """Create a public key secret with Post and Put, then Get""" + + # make a secret + bits = 2048 + pem = keys.get_public_key_pem() + + # create with Post to server + create_req = get_public_key_req(bits, base64.b64encode(pem)) + del create_req['payload'] + del create_req['payload_content_type'] + del create_req['payload_content_encoding'] + test_model = secret_models.SecretModel( + **create_req) + resp, secret_ref = self.secret_behaviors.create_secret(test_model) + self.assertEqual(201, resp.status_code) + + # update with Put to server + update_resp = self.secret_behaviors.update_secret_payload( + secret_ref, + pem, + 'application/octet-stream') + self.assertEqual(204, update_resp.status_code) + + # retrieve with Get to server + content_type = 'application/octet-stream' + get_resp = self.secret_behaviors.get_secret(secret_ref, content_type) + self.assertEqual(200, get_resp.status_code) + + # check that returned pem is same as original pem + self.assertEqual(pem, get_resp.content) + + @testcase.attr('positive') + def test_rsa_two_step_create_and_get_private_key(self): + """Create a private key secret with Post and Put, then Get""" + + # make a secret + bits = 2048 + pem = keys.get_private_key_pkcs8() + + # create with Post to server + create_req = get_private_key_req(bits, base64.b64encode(pem)) + del create_req['payload'] + del create_req['payload_content_type'] + del create_req['payload_content_encoding'] + test_model = secret_models.SecretModel( + **create_req) + resp, secret_ref = self.secret_behaviors.create_secret(test_model) + self.assertEqual(201, resp.status_code) + + # update with Put to server + update_resp = self.secret_behaviors.update_secret_payload( + secret_ref, + pem, + 'application/octet-stream') + self.assertEqual(204, update_resp.status_code) + + # retrieve with Get to server + content_type = 'application/octet-stream' + get_resp = self.secret_behaviors.get_secret(secret_ref, content_type) + self.assertEqual(200, get_resp.status_code) + + # check that returned pem is same as original pem + self.assertEqual(pem, get_resp.content) + + @testcase.attr('positive') + def test_rsa_create_and_get_passphrase(self): + """Create a passphrase secret with one Post, then Get""" + + # make a secret + passphrase = keys.get_passphrase_txt() + + # create with Post to server + test_model = secret_models.SecretModel( + **get_passphrase_req(passphrase)) + resp, secret_ref = self.secret_behaviors.create_secret(test_model) + self.assertEqual(201, resp.status_code) + + # retrieve with Get to server + content_type = 'application/octet-stream' + get_resp = self.secret_behaviors.get_secret(secret_ref, content_type) + self.assertEqual(200, get_resp.status_code) + + # check that returned phrase is same as original phrase + self.assertEqual(passphrase, get_resp.content) + + @testcase.skip("POST comes back with Problem decoding payload") + @testcase.attr('positive') + def test_rsa_create_and_get_certificate_secret(self): + """Create a certificate secret with one Post, then Get""" + + # make a secret + pem = keys.get_certificate_pem() + + # create with Post to server + test_model = secret_models.SecretModel( + **get_certificate_req(base64.b64encode(pem))) + resp, secret_ref = self.secret_behaviors.create_secret(test_model) + self.assertEqual(201, resp.status_code) + + # retrieve with Get to server + content_type = 'application/octet-stream' + get_resp = self.secret_behaviors.get_secret(secret_ref, content_type) + self.assertEqual(200, get_resp.status_code) + + # check that returned certificate is same as original certificate + self.assertEqual(pem, get_resp.content) + + @testcase.attr('positive') + def test_rsa_two_step_create_and_get_certificate_secret(self): + """Create a certificate secret with Post and Put, then Get""" + + # make a secret + pem = keys.get_certificate_pem() + + # create with Post to server + create_req = get_certificate_req(base64.b64encode(pem)) + del create_req['payload'] + del create_req['payload_content_type'] + del create_req['payload_content_encoding'] + test_model = secret_models.SecretModel( + **create_req) + resp, secret_ref = self.secret_behaviors.create_secret(test_model) + self.assertEqual(201, resp.status_code) + + # update with Put to server + update_resp = self.secret_behaviors.update_secret_payload( + secret_ref, + pem, + 'application/octet-stream', + None) + self.assertEqual(204, update_resp.status_code) + + # retrieve with Get to server + content_type = 'application/octet-stream' + get_resp = self.secret_behaviors.get_secret(secret_ref, content_type) + self.assertEqual(200, get_resp.status_code) + + # check that returned pem is same as original pem + self.assertEqual(pem, get_resp.content) + + @testcase.skip("Create private secret fails with decoding error") + @testcase.attr('positive') + def test_rsa_create_and_get_container(self): + """Create an rsa container with one Post, then Get""" + + # make the secrets + bits = 2048 + private_pem = keys.get_private_key_pkcs8() + public_pem = keys.get_public_key_pem() + + # create private secret with Post to server + test_model = secret_models.SecretModel( + **get_private_key_req(bits, base64.b64encode(private_pem))) + resp, private_ref = self.secret_behaviors.create_secret(test_model) + self.assertEqual(201, resp.status_code) + + # create public secret with Post to server + test_model = secret_models.SecretModel( + **get_public_key_req(bits, base64.b64encode(public_pem))) + resp, public_ref = self.secret_behaviors.create_secret(test_model) + self.assertEqual(201, resp.status_code) + + # create container with Post to server + test_model = container_models.ContainerModel( + **get_container_req(public_ref, private_ref)) + resp, container_ref =\ + self.container_behaviors.create_container(test_model) + self.assertEqual(201, resp.status_code) + + # retrieve container with Get to server + get_resp = self.container_behaviors.get_container(container_ref) + self.assertEqual(get_resp.status_code, 200) + self.assertEqual(get_resp.model.secret_refs[0].name, 'public_key') + self.assertEqual(get_resp.model.secret_refs[1].name, 'private_key') + self.assertEqual(get_resp.model.secret_refs[2].name, + 'private_key_passphrase') + + # retrieve public key secret with Get to server + content_type = 'application/octet-stream' + public_get_resp = self.secret_behaviors.get_secret( + get_resp.model.secret_refs[0].secret_ref, content_type) + self.assertEqual(200, public_get_resp.status_code) + + # retrieve private key secret with Get to server + private_get_resp = self.secret_behaviors.get_secret( + get_resp.model.secret_refs[1].secret_ref, content_type) + self.assertEqual(200, private_get_resp.status_code) + + # check that returned secrets are same as original secrets + self.assertEqual(private_pem, private_get_resp.content) + self.assertEqual(public_pem, public_get_resp.content) + + @testcase.skip("Create private secret fails with decoding error") + @testcase.attr('positive') + def test_rsa_create_and_get_container_with_passphrase(self): + """Create an rsa container with one Post, then Get""" + + # make the secrets + bits = 2048 + private_pem = keys.get_encrypted_private_key_pkcs8() + public_pem = keys.get_public_key_pem() + passphrase = keys.get_passphrase_txt() + + # create private secret with Post to server + test_model = secret_models.SecretModel( + **get_private_key_req(bits, base64.b64encode(private_pem))) + resp, private_ref = self.secret_behaviors.create_secret(test_model) + self.assertEqual(201, resp.status_code) + + # create public secret with Post to server + test_model = secret_models.SecretModel( + **get_public_key_req(bits, base64.b64encode(public_pem))) + resp, public_ref = self.secret_behaviors.create_secret(test_model) + self.assertEqual(201, resp.status_code) + + # create passphrase with Post to server + test_model = secret_models.SecretModel( + **get_passphrase_req(passphrase)) + resp, passphrase_ref = self.secret_behaviors.create_secret(test_model) + self.assertEqual(201, resp.status_code) + + # create container with Post to server + test_model = container_models.ContainerModel( + **get_container_req(public_ref, private_ref, passphrase_ref)) + resp, container_ref =\ + self.container_behaviors.create_container(test_model) + self.assertEqual(201, resp.status_code) + + # retrieve container with Get to server + get_resp = self.container_behaviors.get_container(container_ref) + self.assertEqual(get_resp.status_code, 200) + self.assertEqual(get_resp.model.secret_refs[0].name, 'public_key') + self.assertEqual(get_resp.model.secret_refs[1].name, 'private_key') + self.assertEqual(get_resp.model.secret_refs[2].name, + 'private_key_passphrase') + + # retrieve public key secret with Get to server + content_type = 'application/octet-stream' + public_get_resp = self.secret_behaviors.get_secret( + get_resp.model.secret_refs[0].secret_ref, content_type) + self.assertEqual(200, public_get_resp.status_code) + + # retrieve private key secret with Get to server + private_get_resp = self.secret_behaviors.get_secret( + get_resp.model.secret_refs[1].secret_ref, content_type) + self.assertEqual(200, private_get_resp.status_code) + + # retrieve passphrase with Get to server + passphrase_get_resp = self.secret_behaviors.get_secret( + get_resp.model.secret_refs[2].secret_ref, content_type) + self.assertEqual(200, get_resp.status_code) + + # check that returned secrets are same as original secrets + self.assertEqual(private_pem, private_get_resp.content) + self.assertEqual(public_pem, public_get_resp.content) + self.assertEqual(passphrase, passphrase_get_resp.content) + + @testcase.skip("Container is created, but when getting secrets") + @testcase.skip("the returned format is base64, but not PEM") + @testcase.attr('positive') + def test_rsa_order_container(self): + """Order an rsa container with asymmetric keys.""" + + # order an rsa container + test_model = order_models.OrderModel( + **get_order_create_rsa_container()) + create_resp, order_ref = self.order_behaviors.create_order(test_model) + self.assertEqual(create_resp.status_code, 202) + + # get the order + order_resp = self.order_behaviors.get_order(order_ref) + self.assertEqual(order_resp.status_code, 200) + + # get the container + container_resp = self.container_behaviors.get_container( + order_resp.model.container_ref) + self.assertEqual(container_resp.status_code, 200) + + # get the secrets from the container + secret_dict = {} + for secret in container_resp.model.secret_refs: + self.assertIsNotNone(secret.secret_ref) + secret_resp = self.secret_behaviors.get_secret( + secret.secret_ref, "application/octet-stream") + self.assertIsNotNone(secret_resp) + secret_dict[secret.name] = secret_resp.content + + # verify the secrets + self.assertIsNotNone(secret_dict['private_key']) + self.assertIsNotNone(secret_dict['public_key']) + # verify returned keys can be parsed + crypto.load_privatekey(crypto.FILETYPE_PEM, secret_dict['private_key']) + RSA.importKey(secret_dict['public_key']) + + @testcase.skip("Container is created, but when getting secrets") + @testcase.skip("the returned format is base64, but not PEM") + @testcase.attr('positive') + def test_rsa_order_container_with_passphrase(self): + """Order an rsa container with asymmetric keys and a passphrase.""" + + # order an rsa container + test_model = order_models.OrderModel( + **get_order_create_rsa_container_with_passphrase()) + create_resp, order_ref = self.order_behaviors.create_order(test_model) + self.assertEqual(create_resp.status_code, 202) + + # get the order + order_resp = self.order_behaviors.get_order(order_ref) + self.assertEqual(order_resp.status_code, 200) + + # get the container + container_resp = self.container_behaviors.get_container( + order_resp.model.container_ref) + self.assertEqual(container_resp.status_code, 200) + + # get the secrets from the container + secret_dict = {} + for secret in container_resp.model.secret_refs: + self.assertIsNotNone(secret.secret_ref) + secret_resp = self.secret_behaviors.get_secret( + secret.secret_ref, "application/octet-stream") + self.assertIsNotNone(secret_resp) + secret_dict[secret.name] = secret_resp.content + + # verify the secrets + self.assertEqual('password', secret_dict['private_key_passphrase']) + # verify returned keys can be parsed + crypto.load_privatekey(crypto.FILETYPE_PEM, + secret_dict['private_key'], + secret_dict['private_key_passphrase']) + RSA.importKey(secret_dict['public_key']) + + @testcase.attr('positive') + def test_rsa_create_container_from_two_step_secrets(self): + """Order certificate from created rsa container.""" + + # store public key + bits = 2048 + public_pem = keys.get_public_key_pem() + create_req = get_public_key_req(bits, base64.b64encode(public_pem)) + del create_req['payload'] + del create_req['payload_content_type'] + del create_req['payload_content_encoding'] + test_model = secret_models.SecretModel(**create_req) + resp, public_ref = self.secret_behaviors.create_secret(test_model) + self.assertEqual(201, resp.status_code) + # update with Put to server + update_resp = self.secret_behaviors.update_secret_payload( + public_ref, public_pem, 'application/octet-stream') + self.assertEqual(204, update_resp.status_code) + + # store private key + private_pem = keys.get_private_key_pkcs8() + create_req = get_private_key_req(bits, base64.b64encode(private_pem)) + del create_req['payload'] + del create_req['payload_content_type'] + del create_req['payload_content_encoding'] + test_model = secret_models.SecretModel(**create_req) + resp, private_ref = self.secret_behaviors.create_secret(test_model) + self.assertEqual(201, resp.status_code) + update_resp = self.secret_behaviors.update_secret_payload( + private_ref, private_pem, 'application/octet-stream') + self.assertEqual(204, update_resp.status_code) + + # create container with Post to server + test_model = container_models.ContainerModel( + **get_container_req(public_ref, private_ref)) + resp, container_ref =\ + self.container_behaviors.create_container(test_model) + self.assertEqual(201, resp.status_code) + + # get the container + container_resp = self.container_behaviors.get_container(container_ref) + self.assertEqual(container_resp.status_code, 200) + + # get the secrets from the container + secret_dict = {} + for secret in container_resp.model.secret_refs: + self.assertIsNotNone(secret.secret_ref) + secret_resp = self.secret_behaviors.get_secret( + secret.secret_ref, "application/octet-stream") + self.assertIsNotNone(secret_resp) + secret_dict[secret.name] = secret_resp.content + + # check that returned secrets are same as original secrets + self.assertEqual(private_pem, secret_dict['private_key']) + self.assertEqual(public_pem, secret_dict['public_key']) + + @testcase.skip("Broken: order is done, but meta data is not updated") + @testcase.attr('positive') + def test_rsa_order_certificate_from_generated_container(self): + """Order a certificate from generated rsa container.""" + + # order an rsa container + test_model = order_models.OrderModel( + **get_order_create_rsa_container()) + create_resp, order_ref = self.order_behaviors.create_order(test_model) + self.assertEqual(create_resp.status_code, 202) + + # get the container order + order_resp = self.order_behaviors.get_order(order_ref) + self.assertEqual(order_resp.status_code, 200) + + # get the container ref + container_resp = self.container_behaviors.get_container( + order_resp.model.container_ref) + self.assertEqual(container_resp.status_code, 200) + container_ref = order_resp.model.container_ref + + # order an rsa certificate + test_model = order_models.OrderModel( + **get_order_create_certificate(container_ref)) + create_resp, order_ref = self.order_behaviors.create_order(test_model) + self.assertEqual(202, create_resp.status_code) + + # get the certificate order + order_resp = self.order_behaviors.get_order(order_ref) + self.assertEqual(order_resp.status_code, 200) + self.assertTrue(order_resp.model.status == "ACTIVE" or + order_resp.model.status == "PENDING") + + # Wait until order is ACTIVE + self.wait_for_order(order_resp, order_ref) + + # verify order results + # self.assertEqual('ACTIVE', order_resp.model.status) + self.assertIsNotNone(order_resp.model.meta['plugin_name']) + csr = order_resp.model.meta['generated_csr'] + crypto.load_certificate_request(crypto.FILETYPE_PEM, csr) + + @testcase.skip("Broken: order is done, but meta data is not updated") + @testcase.attr('positive') + def test_rsa_order_certificate_from_generated_container_with_pass(self): + """Order certificate from generated rsa container with passphrase.""" + + # order an rsa container + test_model = order_models.OrderModel( + **get_order_create_rsa_container_with_passphrase()) + create_resp, order_ref = self.order_behaviors.create_order(test_model) + self.assertEqual(create_resp.status_code, 202) + + # get the container order + order_resp = self.order_behaviors.get_order(order_ref) + self.assertEqual(order_resp.status_code, 200) + + # get the container ref + container_resp = self.container_behaviors.get_container( + order_resp.model.container_ref) + self.assertEqual(container_resp.status_code, 200) + container_ref = order_resp.model.container_ref + + # order an rsa certificate + test_model = order_models.OrderModel( + **get_order_create_certificate(container_ref)) + create_resp, order_ref = self.order_behaviors.create_order(test_model) + self.assertEqual(202, create_resp.status_code) + + # get the certificate order + order_resp = self.order_behaviors.get_order(order_ref) + self.assertEqual(order_resp.status_code, 200) + self.assertTrue(order_resp.model.status == "ACTIVE" or + order_resp.model.status == "PENDING") + + # Wait until order is ACTIVE + self.wait_for_order(order_resp, order_ref) + + # verify order results + # self.assertEqual('ACTIVE', order_resp.model.status) + self.assertIsNotNone(order_resp.model.meta['plugin_name']) + csr = order_resp.model.meta['generated_csr'] + crypto.load_certificate_request(crypto.FILETYPE_PEM, csr) + + @testcase.skip("Broken: load_privatekey fails during generate_csr") + @testcase.attr('positive') + def test_rsa_order_certificate_from_created_container(self): + """Order certificate from created rsa container.""" + + # store public key + bits = 2048 + public_pem = keys.get_public_key_pem() + create_req = get_public_key_req(bits, base64.b64encode(public_pem)) + del create_req['payload'] + del create_req['payload_content_type'] + del create_req['payload_content_encoding'] + test_model = secret_models.SecretModel(**create_req) + resp, public_ref = self.secret_behaviors.create_secret(test_model) + self.assertEqual(201, resp.status_code) + # update with Put to server + update_resp = self.secret_behaviors.update_secret_payload( + public_ref, public_pem, 'application/octet-stream') + self.assertEqual(204, update_resp.status_code) + + # store private key + private_pem = keys.get_private_key_pkcs8() + create_req = get_private_key_req(bits, base64.b64encode(private_pem)) + del create_req['payload'] + del create_req['payload_content_type'] + del create_req['payload_content_encoding'] + test_model = secret_models.SecretModel(**create_req) + resp, private_ref = self.secret_behaviors.create_secret(test_model) + self.assertEqual(201, resp.status_code) + update_resp = self.secret_behaviors.update_secret_payload( + private_ref, private_pem, 'application/octet-stream') + self.assertEqual(204, update_resp.status_code) + + # create container with Post to server + test_model = container_models.ContainerModel( + **get_container_req(public_ref, private_ref)) + resp, container_ref =\ + self.container_behaviors.create_container(test_model) + self.assertEqual(201, resp.status_code) + + # order an rsa certificate + test_model = order_models.OrderModel( + **get_order_create_certificate(container_ref)) + create_resp, order_ref = self.order_behaviors.create_order(test_model) + self.assertEqual(202, create_resp.status_code) + + # get the certificate order + order_resp = self.order_behaviors.get_order(order_ref) + self.assertEqual(order_resp.status_code, 200) + self.assertTrue(order_resp.model.status == "ACTIVE" or + order_resp.model.status == "PENDING") + + # Wait until order is ACTIVE + self.wait_for_order(order_resp, order_ref) + + # verify order results + # self.assertEqual('ACTIVE', order_resp.model.status) + self.assertIsNotNone(order_resp.model.meta['plugin_name']) + csr = order_resp.model.meta['generated_csr'] + crypto.load_certificate_request(crypto.FILETYPE_PEM, csr) + + @testcase.skip("Broken: load_privatekey fails during generate_csr") + @testcase.attr('positive') + def test_rsa_order_certificate_from_created_container_with_pass(self): + """Order certificate from created rsa container with passphrase.""" + + # store public key + bits = 2048 + public_pem = keys.get_public_key_pem() + create_req = get_public_key_req(bits, base64.b64encode(public_pem)) + del create_req['payload'] + del create_req['payload_content_type'] + del create_req['payload_content_encoding'] + test_model = secret_models.SecretModel(**create_req) + resp, public_ref = self.secret_behaviors.create_secret(test_model) + self.assertEqual(201, resp.status_code) + # update with Put to server + update_resp = self.secret_behaviors.update_secret_payload( + public_ref, public_pem, 'application/octet-stream') + self.assertEqual(204, update_resp.status_code) + + # store private key + private_pem = keys.get_private_key_pkcs8() + create_req = get_private_key_req(bits, base64.b64encode(private_pem)) + del create_req['payload'] + del create_req['payload_content_type'] + del create_req['payload_content_encoding'] + test_model = secret_models.SecretModel(**create_req) + resp, private_ref = self.secret_behaviors.create_secret(test_model) + self.assertEqual(201, resp.status_code) + update_resp = self.secret_behaviors.update_secret_payload( + private_ref, private_pem, 'application/octet-stream') + self.assertEqual(204, update_resp.status_code) + + # store the passphrase + passphrase = keys.get_passphrase_txt() + test_model = secret_models.SecretModel( + **get_passphrase_req(passphrase)) + resp, passphrase_ref = self.secret_behaviors.create_secret(test_model) + self.assertEqual(201, resp.status_code) + + # create the container + test_model = container_models.ContainerModel( + **get_container_req(public_ref, private_ref, passphrase_ref)) + resp, container_ref =\ + self.container_behaviors.create_container(test_model) + self.assertEqual(201, resp.status_code) + + # order an rsa certificate + test_model = order_models.OrderModel( + **get_order_create_certificate(container_ref)) + create_resp, order_ref = self.order_behaviors.create_order(test_model) + self.assertEqual(202, create_resp.status_code) + + # get the certificate order + order_resp = self.order_behaviors.get_order(order_ref) + self.assertEqual(order_resp.status_code, 200) + self.assertTrue(order_resp.model.status == "ACTIVE" or + order_resp.model.status == "PENDING") + + # Wait until order is ACTIVE + self.wait_for_order(order_resp, order_ref) + + # verify order results + # self.assertEqual('ACTIVE', order_resp.model.status) + self.assertIsNotNone(order_resp.model.meta['plugin_name']) + csr = order_resp.model.meta['generated_csr'] + crypto.load_certificate_request(crypto.FILETYPE_PEM, csr) + + @testcase.skip("Broken: order is done, but meta data is not updated") + @testcase.attr('positive') + def test_rsa_order_certificate_from_csr(self): + """Order certificate from csr""" + + # order an rsa certificate + csr = keys.get_csr_pem() + test_model = order_models.OrderModel( + **get_order_create_certificate_simple_cmc(csr)) + create_resp, order_ref = self.order_behaviors.create_order(test_model) + self.assertEqual(202, create_resp.status_code) + + # get the certificate order + order_resp = self.order_behaviors.get_order(order_ref) + self.assertEqual(order_resp.status_code, 200) + self.assertTrue(order_resp.model.status == "ACTIVE" or + order_resp.model.status == "PENDING") + self.assertIsNotNone(order_resp.model.meta['plugin_name']) diff --git a/functionaltests/common/keys.py b/functionaltests/common/keys.py new file mode 100644 index 000000000..25e89a3c3 --- /dev/null +++ b/functionaltests/common/keys.py @@ -0,0 +1,201 @@ +# Copyright (c) 2015 Cisco Systems +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def get_private_key_pkcs8(): + """Returns a private key in PCKS#8 format + + This key was created by issuing the following openssl commands: + + openssl genrsa -out private.pem 2048 + openssl pkcs8 -topk8 -nocrypt -in private.pem -out private.pk8 + + The byte string returned by this function is the contents + of the private.pk8 file. + """ + + return """-----BEGIN PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCza2VoDXmBUMmw +jFu9F6MM5q/AZ1WjnWA2YNdNy237TrGN/nobDDv8FBBpUPmHNZ04H1LyxFcP8ReF +rcIXpifsReu2lAWaqRPxovu5CuAhfecKv+RhjLVLJ0I+MZIb72ROKpfZTmb7dhlF +gGD3vkC51BCfhGVW35w52OY/23x5MeO4yvx5myPccnxMVQ42KuDrzKqjBlSjmBnc +pGYx0JgCT+syFmHsl8rOkqCPPFLo24YQn+4/pr1AYwaZAbMTl9zoLtEQj6sxScuH +cS9e8niptDxlsbLQgqGVaGdE117stC95QH7UvITbuYzdjZwBFc1Sgz8GZ/2hLSsH +ujJiIQcvAgMBAAECggEAMOlUKbuSpigp85Ev6Sqqbnfs7Zy+Ae6DLg/UYgbVIq9f +RABdtUXujFfD6ZIDlFKPW59ec4QG3/evm+e0g9HuDEE7cviDVphFMZhm2xkV5Mt3 +0rxhPB6pxaUcL+w/kpH+XDjMUJdJB8A4P3Qx+xfIeWBQb8wd/ELVSgfRLRNeqYL0 +0KXVs04/FOBEhqSiqi/oHYJ4gxNrSoINX71PHVbaEikIygzi4HZVyMut3LE6ceHz +fSj71ftn+Ui0TzkLOb+NoBP31haHC/sfCrpKg7QtUP9q9dRq6dZcI17q5d7oEdET +eDRKhT2vm7bx2bLGeF1w2H9B/V81upjiAah2RVnecQKBgQDsfHSjR1gd+SHw/2A9 +SaXS1k9LeXLt+UbDQdbjYOsh5LoT+EN/utO70RyDYqjlhzqJzciKTuAW5SVPC6gQ +uCppA29Kntq7x1+Lw/4wG947poXb60tLdg3BK5mBFTORk5ATqAwVq7t+2NtS5S/J +unzs5xrRolDFnSX4KnvVl6Jj3QKBgQDCOXZTVXRPEFhnqnqLErZe6EJkySwG8wgt +OdCmr660bocY1i9vV+RaM1iARHX6u/suMhkz+3KRinzxIG5gQsyiWmTpFV298W9v +kRtmsCQDn2my90yv4e6sLI0ng7l/N3r7CwLLNIV/CqeyaN40suzE8AjgEga5jTua +6bP5m+x8ewKBgQCeuW3DxXfkLjnUumMK36qX11XDb5FvHjebiE5FsOBAkHdAPgp3 +6ZqBXfoISSjZXakxotft1MDdPRGMe2NjTWjRsQd6iyJ+lHORqIusGJhRaxQ/Ji8U +R/k1ZSETnXpORD+YodrylKA0pDKY8dDgUfXVP8wlVg9mg3JfnYweMTdCVQKBgQCx +133iNmgmkTfxzGci+wJkitVohdA7mMOO7daBGnKlImOvuUd784XTlhpecNF6wi/w +D82GDKLOY3meLO0EVYYczxqBVqAccXtxM/RcJcMEUi6twcXFcuJhYvXpDbOHqlyA +jIeFW9U1C6OcOGvm40Lr3UKzMa5Yrtq6MW4ri7uSCwKBgQDfdqVjT4uXmGwOh1z4 +Pzv6GCoc+6GobXg4DvvCUjP9MR+2+5sX0AY/f+aVCD05/Nj0RqpAwUc03zZU5ZtL +2uNe6XDjEugfFtlzea6+rbD6KpFS+nxPJA8YyWYRpNhpRWGWQakHedr3BtMtGs0h +pKNAQG72HKWtSfJQMXvn2RlicA== +-----END PRIVATE KEY-----""" + + +def get_public_key_pem(): + """Returns a public key in PEM format + + This key was created by issuing the following openssl commands: + + openssl genrsa -out private.pem 2048 + openssl rsa -in private.pem -pubout > public.pem + + The byte string returned by this function is the contents + of the public.pem file. + """ + + return """-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAs2tlaA15gVDJsIxbvRej +DOavwGdVo51gNmDXTctt+06xjf56Gww7/BQQaVD5hzWdOB9S8sRXD/EXha3CF6Yn +7EXrtpQFmqkT8aL7uQrgIX3nCr/kYYy1SydCPjGSG+9kTiqX2U5m+3YZRYBg975A +udQQn4RlVt+cOdjmP9t8eTHjuMr8eZsj3HJ8TFUONirg68yqowZUo5gZ3KRmMdCY +Ak/rMhZh7JfKzpKgjzxS6NuGEJ/uP6a9QGMGmQGzE5fc6C7REI+rMUnLh3EvXvJ4 +qbQ8ZbGy0IKhlWhnRNde7LQveUB+1LyE27mM3Y2cARXNUoM/Bmf9oS0rB7oyYiEH +LwIDAQAB +-----END PUBLIC KEY-----""" + + +def get_encrypted_private_key_pkcs8(): + """Returns an encrypted private key in PKCS#8 format + + This key was created by issuing the following openssl commands: + + openssl genrsa -out private.pem 2048 + echo password > passphrase.txt + openssl pkcs8 -topk8 -passout file:passphrase.txt \ + -in private.pem -out private_encrypted.pk8 + + The byte string returned by this function is the contents + of the private_encrypted.pk8 file. + """ + + return """-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIE6TAbBgkqhkiG9w0BBQMwDgQIssadeQrYhhACAggABIIEyDNw3SV2b19yy4Q/ +kTbtJ/p2X2zKDqr7GgLeAowqqhcMfvprI7G8C0XtwxkR4SjMZUXNcmOwQB2kNKtK +ZilCz6pSx81iUj4s1fU460XkhkIeV+F7aB2PsTG1oDfPCuzKFjT6EuSE6lFUH89r +TRuHWMPseW7lrvEB5kNMFag5QxeKjsSCNkZWOT74o4fh3cEplgCEaA+nCclXU79m +5rhaa9e1SUpPuPlhnAIDkBtHcC38B+SOYKQxLdZT1f72oZ1ozWJ4bEhKxvnNu5J+ +tCvgWOXMIEJVGgf8Cu58PoR18LyyAIk7zza+1LkCiyuLNgiz8a1sVw8uBcrVgD5R +8f4XgI/Yjb16Bmpp/0iEjNcURaby9GnCCEc+W/ivSJTnG3o1Xn00FO98l2aggNpt +S8gxK05NeCtdWoFFjTeIXxnb1ct0Iep8RwuO+FnupAf6aw12Uqj4qYNvNiY/kBhS +P/Yd3KznasrolUZ9+PVTMUI45UTMN/XhNvXrozMq9nItWTV7wHyEL3mrYipvcxrm +SnLlAp2zkmSu923cHN1teLE99/rV2jaBM03ROqvYWaxjfOjxfwz6PhdE8G//kWd0 +tf2Om+fyCkBRxo1sUcuiE79hJXgP5KJCMbPsDyG/aQk4oeS1nbn15AhthwiU7A13 +h9X6asgV2H+4Ljf+tr1b8p3qj3CSljfzoVErLqoHagjVB45WktHhrWbUSRpXSvPo +Hh0LY62qxTa67gKjwarH5hYr5IaH39iR9bcyuvzE+u9TJWvWmeLJ7UmesfVPZtSf +/JTpvr0zu4C95lXKt4FdxOhGcWwDN1Zp+lCsF5ruBGc+/pEggiXi1qvW9xUny1Of +8NqdPxGPb4/zPHGaysypPsc6LiY3esI8wa7FnDsS4e79dWinD/BPWEa5N2jLm0Rr +njkHTy0xtnw/a8Ofrtyy9V1tBBOCaswzGIZZj6oHyFCtAvjZuYa8TWVmSi6EqJKi +lY5wSdQQXg3H0HnQYivtOY1YbfjtRkUB9e4xkSVhvYJpY1QWBtApdUGBsxsELkDC +6cv/Kxnd9U7dz9+VhD0hAdrhFqbWqOEGTWt7xE44yzWokdKQWu5FsTs6gyXsGPen +ZgZlR5pjPNGbMdftW0M473YyvtzjrCuSVgJspCzpA9uo6wfejaFb4RF/tcWtXglE +Q5FzfsO1OZr6nONraShj9N1kxGBXUUOtAjZI/zoTWk3yndxw3IpvPtDTg9ByCp7F +RFUtDyrki+YAIAiTgPq7qwc1upjU7R1Zlg4jIe0RI9A73NyLwa4QhgO+HmRBt7At +LLuUeCFKuXMBHzlDaMYwq5ZPOb8VcMkhUoug2YJIc4YOOHh5O0mYnat0vaYO+A58 +DiuYgxKmO5+6+OMk2ovZgk1sFawR4rk9HUt8goUUptZ+hoHUVGtte5YcQniIOcds +qY3ni/zwswHWQRaAu8Ej4qJKt1XwZo2K04xHhL90TMaY8NpLSMCfVqDDL409TqIj +zHUfYl6N2Me4eKc8vl6Sm63g57NzLqTttD6KSn8v+OmUF5mOQwcLnr3nK7S+BQfI +DLPY1Oh7Kec/M/d1080/Qv9YBAJhz50TLKoxXwVeH4OOvuaHVaotElMkr5QEkEXl +gRgwkbMrQjg0II0O9g== +-----END ENCRYPTED PRIVATE KEY-----""" + + +def get_passphrase_txt(): + """Returns the plain text string used to encrypt the private key + + This key was created by issuing the following commands: + + echo password > passphrase.txt + + The byte string returned by this function is the contents + of the passphrase.txt file. + """ + + return """password""" + + +def get_csr_pem(): + """Returns a Certificate Signing Request (CSR) in PEM format + + This key was created by issuing the following openssl commands: + + openssl genrsa -out private.pem 2048 + openssl req -new -key private.pem -out csr.pem -subj '/CN=example.com' + + The byte string returned by this function is the contents + of the csr.pem file. + """ + + return """-----BEGIN CERTIFICATE REQUEST----- +MIICWzCCAUMCAQAwFjEUMBIGA1UEAwwLZXhhbXBsZS5jb20wggEiMA0GCSqGSIb3 +DQEBAQUAA4IBDwAwggEKAoIBAQCza2VoDXmBUMmwjFu9F6MM5q/AZ1WjnWA2YNdN +y237TrGN/nobDDv8FBBpUPmHNZ04H1LyxFcP8ReFrcIXpifsReu2lAWaqRPxovu5 +CuAhfecKv+RhjLVLJ0I+MZIb72ROKpfZTmb7dhlFgGD3vkC51BCfhGVW35w52OY/ +23x5MeO4yvx5myPccnxMVQ42KuDrzKqjBlSjmBncpGYx0JgCT+syFmHsl8rOkqCP +PFLo24YQn+4/pr1AYwaZAbMTl9zoLtEQj6sxScuHcS9e8niptDxlsbLQgqGVaGdE +117stC95QH7UvITbuYzdjZwBFc1Sgz8GZ/2hLSsHujJiIQcvAgMBAAGgADANBgkq +hkiG9w0BAQsFAAOCAQEAPJDIxzgtUDRgpfTbTOPDJYap+Lm4jYxsCuAFbYiQ43B+ +c7RyzEFOB2anrldTm3XzNytHZAkRTnN4dH09p1K1Pyepv+weSv8rvN9OohfYgpcj +wQqw8ksdGb3Q6oPnTgGxmWvV4PbzHmDnOvOiQ+wuBHWXYks6tdgU7iCZ1djYibmL +1j+XEvtstou8gu1lWhzH6tStwmA9udncg5rEvfDUDyvMN3T06QFqrlK9K1TXIlbM +RvUDrBjINIOuEeZ/5czjBl1CX1Z1YqdunrPiCQM4+oUAtjyD6ZAsyAEXLKdSYtKZ +hSZgIl7v+UAIM+9bhpVg15aTjRzfH2OsZodFIbsMDw== +-----END CERTIFICATE REQUEST-----""" + + +def get_certificate_pem(): + """Returns an X509 certificate in PEM format + + This key was created by issuing the following openssl commands: + + openssl genrsa -out private.pem 2048 + openssl req -new -x509 -key private.pem -out cert.pem \ + -days 1000 -subj '/CN=example.com' + + The byte string returned by this function is the contents + of the cert.pem file. + """ + + return """-----BEGIN CERTIFICATE----- +MIIC/zCCAeegAwIBAgIJAOLqXKJ9q9/nMA0GCSqGSIb3DQEBCwUAMBYxFDASBgNV +BAMMC2V4YW1wbGUuY29tMB4XDTE1MDQxMTAyMTUyOVoXDTE4MDEwNTAyMTUyOVow +FjEUMBIGA1UEAwwLZXhhbXBsZS5jb20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw +ggEKAoIBAQCza2VoDXmBUMmwjFu9F6MM5q/AZ1WjnWA2YNdNy237TrGN/nobDDv8 +FBBpUPmHNZ04H1LyxFcP8ReFrcIXpifsReu2lAWaqRPxovu5CuAhfecKv+RhjLVL +J0I+MZIb72ROKpfZTmb7dhlFgGD3vkC51BCfhGVW35w52OY/23x5MeO4yvx5myPc +cnxMVQ42KuDrzKqjBlSjmBncpGYx0JgCT+syFmHsl8rOkqCPPFLo24YQn+4/pr1A +YwaZAbMTl9zoLtEQj6sxScuHcS9e8niptDxlsbLQgqGVaGdE117stC95QH7UvITb +uYzdjZwBFc1Sgz8GZ/2hLSsHujJiIQcvAgMBAAGjUDBOMB0GA1UdDgQWBBSUq2A0 +b2Xo+sKvmKgN8Wq8l6j82jAfBgNVHSMEGDAWgBSUq2A0b2Xo+sKvmKgN8Wq8l6j8 +2jAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQBjiuqhlzNVOVLrHDQy +Gr0fTACFJdDREnuhZp4d91++DmMCT+bcTG0+GCp3rfFOuEWpJLLLPdSOnIsnibsO +syKPXuBBX5kmdYIojbdjUTSwnhcx9JTAfKSmxXWSC0rnKCefAf44Mm6fqvoTyTbe +GSQP6nHzc7eLaK/efcrMvYdct+TeTkHjqR8Lu4pjZvRdUQadQHhDyN+ONKdKD9Tr +jvfPim0b7Aq885PjSN6Qo4Z9HXR6+nK+bTz9HyUATMfDGNQt0L3vyfVxbNOxkCBc +YI4hFtGfkOzd6B7r2sY1wGKdTLHkuT4m4/9A/SOzvnH+epnJqIS9jw+1iRj8xcDA +6PNT +-----END CERTIFICATE-----"""