From 0051bd4be05ee11f8033a744a04c106a24045518 Mon Sep 17 00:00:00 2001 From: Dmitry Tantsur Date: Thu, 26 Aug 2021 19:18:44 +0200 Subject: [PATCH] Support uploading certificates for virtual media This change implements the new feature of VirtualMedia 1.4.0: custom TLS certificates. Adding/listing/deleting/replacing certificates is supported, but creating a CSR is not. Change-Id: I30a5c673ab91486cafbe1383c5fe58142102a7c2 --- .../certificate-service-ff8061143d454313.yaml | 4 + sushy_tools/emulator/api_utils.py | 7 +- .../controllers/certificate_service.py | 68 +++++++++ .../emulator/controllers/virtual_media.py | 63 ++++++++- sushy_tools/emulator/main.py | 2 + sushy_tools/emulator/resources/vmedia.py | 69 ++++++++++ .../emulator/templates/certificate.json | 10 ++ .../templates/certificate_service.json | 14 ++ sushy_tools/emulator/templates/root.json | 7 +- .../controllers/test_certificate_service.py | 104 ++++++++++++++ .../controllers/test_virtual_media.py | 92 ++++++++++++- .../unit/emulator/resources/test_vmedia.py | 129 ++++++++++++++++++ 12 files changed, 557 insertions(+), 12 deletions(-) create mode 100644 releasenotes/notes/certificate-service-ff8061143d454313.yaml create mode 100644 sushy_tools/emulator/controllers/certificate_service.py create mode 100755 sushy_tools/emulator/templates/certificate.json create mode 100755 sushy_tools/emulator/templates/certificate_service.json create mode 100644 sushy_tools/tests/unit/emulator/controllers/test_certificate_service.py diff --git a/releasenotes/notes/certificate-service-ff8061143d454313.yaml b/releasenotes/notes/certificate-service-ff8061143d454313.yaml new file mode 100644 index 00000000..16c303d8 --- /dev/null +++ b/releasenotes/notes/certificate-service-ff8061143d454313.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + Adds basic support for custom TLS certificates with virtual media. diff --git a/sushy_tools/emulator/api_utils.py b/sushy_tools/emulator/api_utils.py index 6a840343..62e97a06 100644 --- a/sushy_tools/emulator/api_utils.py +++ b/sushy_tools/emulator/api_utils.py @@ -66,10 +66,11 @@ def returns_json(decorated_func): if isinstance(response, flask.Response): return response if isinstance(response, tuple): - contents, status = response + contents, status, *headers = response else: - contents, status = response, 200 + contents, status, headers = response, 200, () + kwargs = {'headers': headers[0]} if headers else {} return flask.Response(response=contents, status=status, - content_type='application/json') + content_type='application/json', **kwargs) return decorator diff --git a/sushy_tools/emulator/controllers/certificate_service.py b/sushy_tools/emulator/controllers/certificate_service.py new file mode 100644 index 00000000..0b3280b4 --- /dev/null +++ b/sushy_tools/emulator/controllers/certificate_service.py @@ -0,0 +1,68 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import re + +import flask + +from sushy_tools.emulator import api_utils +from sushy_tools import error + + +certificate_service = flask.Blueprint( + 'CertificateService', __name__, + url_prefix='/redfish/v1/CertificateService') + + +_VMEDIA_URI_PATTERN = re.compile( + '/redfish/v1/Managers/([^/]+)/VirtualMedia/([^/]+)/Certificates/([^/]+)/?$' +) + + +@certificate_service.route('/', methods=['GET']) +@api_utils.returns_json +def certificate_service_resource(): + api_utils.debug('Serving certificate service') + return flask.render_template('certificate_service.json') + + +@certificate_service.route('/Actions/CertificateService.ReplaceCertificate', + methods=['POST']) +@api_utils.ensure_instance_access +@api_utils.returns_json +def certificate_service_replace_certificate(): + if not flask.request.json: + raise error.BadRequest("Empty or malformed certificate") + + try: + cert_string = flask.request.json['CertificateString'] + cert_type = flask.request.json['CertificateType'] + cert_uri = flask.request.json['CertificateUri'] + except KeyError as exc: + raise error.BadRequest(f"Missing required parameter {exc}") + + match = _VMEDIA_URI_PATTERN.search(cert_uri) + if not match: + raise error.NotFound( + f"Certificates at URI {cert_uri} are not supported") + + if cert_type != 'PEM': + raise error.BadRequest( + f"Only PEM certificates are supported, got {cert_type}") + + manager_id, device, cert_id = match.groups() + + flask.current_app.managers.get_manager(manager_id) + flask.current_app.vmedia.replace_certificate( + manager_id, device, cert_id, cert_string, cert_type) + + return '', 204 diff --git a/sushy_tools/emulator/controllers/virtual_media.py b/sushy_tools/emulator/controllers/virtual_media.py index 3b027eb8..9a46ed2c 100644 --- a/sushy_tools/emulator/controllers/virtual_media.py +++ b/sushy_tools/emulator/controllers/virtual_media.py @@ -69,6 +69,7 @@ def virtual_media_resource(identity, device): @virtual_media.route('/', methods=['PATCH']) @api_utils.returns_json def virtual_media_patch(identity, device): + flask.current_app.managers.get_manager(identity) if not flask.request.json: raise error.BadRequest("Empty or malformed patch") @@ -90,24 +91,78 @@ def virtual_media_patch(identity, device): @virtual_media.route('//Certificates', methods=['GET']) @api_utils.returns_json def virtual_media_certificates(identity, device): + flask.current_app.managers.get_manager(identity) location = \ f'/redfish/v1/Managers/{identity}/VirtualMedia/{device}/Certificates' + certificates = flask.current_app.vmedia.list_certificates(identity, device) return flask.render_template( 'certificate_collection.json', location=location, - # TODO(dtantsur): implement - certificates=[], + certificates=[cert.id for cert in certificates], ) @virtual_media.route('//Certificates', methods=['POST']) @api_utils.returns_json def virtual_media_add_certificate(identity, device): + flask.current_app.managers.get_manager(identity) if not flask.request.json: raise error.BadRequest("Empty or malformed certificate") - # TODO(dtantsur): implement - raise error.NotSupportedError("Not implemented") + try: + cert_string = flask.request.json['CertificateString'] + cert_type = flask.request.json['CertificateType'] + except KeyError as exc: + raise error.BadRequest(f"Missing required parameter {exc}") + + if cert_type != 'PEM': + raise error.BadRequest( + f"Only PEM certificates are supported, got {cert_type}") + + api_utils.debug('Adding certificate for virtual media %s at ' + 'manager "%s"', device, identity) + + cert = flask.current_app.vmedia.add_certificate( + identity, device, cert_string, cert_type) + + location = ( + f'/redfish/v1/Managers/{identity}/VirtualMedia/{device}' + f'/Certificates/{cert.id}' + ) + return '', 204, {'Location': location} + + +@virtual_media.route('//Certificates/', methods=['GET']) +@api_utils.returns_json +def virtual_media_get_certificate(identity, device, cert_id): + flask.current_app.managers.get_manager(identity) + location = ( + f'/redfish/v1/Managers/{identity}/VirtualMedia/{device}' + f'/Certificates/{cert_id}' + ) + certificates = flask.current_app.vmedia.list_certificates(identity, device) + try: + cert = next(c for c in certificates if c.id == cert_id) + except StopIteration: + raise error.NotFound() + + return flask.render_template( + 'certificate.json', + location=location, + cert_id=cert.id, + cert_string=cert.string, + cert_type=cert.type_, + ) + + +@virtual_media.route('//Certificates/', methods=['DELETE']) +@api_utils.returns_json +def virtual_media_delete_certificate(identity, device, cert_id): + flask.current_app.managers.get_manager(identity) + api_utils.debug('Removing certificate %s for virtual media %s at ' + 'manager "%s"', cert_id, device, identity) + flask.current_app.vmedia.delete_certificate(identity, device, cert_id) + return '', 204 @virtual_media.route('//Actions/VirtualMedia.InsertMedia', diff --git a/sushy_tools/emulator/main.py b/sushy_tools/emulator/main.py index f199b373..832dc4bf 100755 --- a/sushy_tools/emulator/main.py +++ b/sushy_tools/emulator/main.py @@ -25,6 +25,7 @@ from ironic_lib import auth_basic from werkzeug import exceptions as wz_exc from sushy_tools.emulator import api_utils +from sushy_tools.emulator.controllers import certificate_service as certctl from sushy_tools.emulator.controllers import virtual_media as vmctl from sushy_tools.emulator import memoize from sushy_tools.emulator.resources import chassis as chsdriver @@ -153,6 +154,7 @@ class Application(flask.Flask): app = Application() +app.register_blueprint(certctl.certificate_service) app.register_blueprint(vmctl.virtual_media) diff --git a/sushy_tools/emulator/resources/vmedia.py b/sushy_tools/emulator/resources/vmedia.py index 5df7fcfd..3292cda4 100644 --- a/sushy_tools/emulator/resources/vmedia.py +++ b/sushy_tools/emulator/resources/vmedia.py @@ -30,6 +30,11 @@ DeviceInfo = collections.namedtuple( 'DeviceInfo', ['image_name', 'image_url', 'inserted', 'write_protected', 'username', 'password', 'verify']) +Certificate = collections.namedtuple( + 'Certificate', + ['id', 'string', 'type_']) + +_CERT_ID = "Default" class StaticDriver(base.DriverBase): @@ -150,6 +155,47 @@ class StaticDriver(base.DriverBase): device_info['Verify'] = verify self._devices[(identity, device)] = device_info + def add_certificate(self, identity, device, cert_string, cert_type): + device_info = self._get_device(identity, device) + + if "Certificate" in device_info: + raise error.FishyError("Virtual media certificate already exists", + code=409) + + device_info["Certificate"] = {'Type': cert_type, 'String': cert_string} + self._devices[(identity, device)] = device_info + + return Certificate(_CERT_ID, cert_string, cert_type) + + def replace_certificate(self, identity, device, cert_id, + cert_string, cert_type): + device_info = self._get_device(identity, device) + if cert_id != _CERT_ID or "Certificate" not in device_info: + raise error.NotFound(f"Certificate {cert_id} not found") + + device_info["Certificate"] = {'Type': cert_type, 'String': cert_string} + self._devices[(identity, device)] = device_info + + return Certificate(_CERT_ID, cert_string, cert_type) + + def list_certificates(self, identity, device): + device_info = self._get_device(identity, device) + try: + certificate = device_info["Certificate"] + except KeyError: + return [] + + return [Certificate(_CERT_ID, certificate['String'], + certificate['Type'])] + + def delete_certificate(self, identity, device, cert_id): + device_info = self._get_device(identity, device) + if cert_id != _CERT_ID or "Certificate" not in device_info: + raise error.NotFound(f"Certificate {cert_id} not found") + + del device_info["Certificate"] + self._devices[(identity, device)] = device_info + def _write_from_response(self, image_url, rsp, tmp_file): with open(tmp_file.name, 'wb') as fl: for chunk in rsp.iter_content(chunk_size=8192): @@ -193,8 +239,28 @@ class StaticDriver(base.DriverBase): # NOTE(dtantsur): it's de facto standard for Redfish to default # to no certificate validation. self._config.get('SUSHY_EMULATOR_VMEDIA_VERIFY_SSL', False)) + custom_cert = None + if verify_media_cert: + try: + custom_cert = device_info['Certificate']['String'] + except KeyError: + self._logger.debug( + 'TLS verification is enabled but not custom certificate ' + 'is provided, using built-in CA for manager %s, virtual ' + 'media device %s', identity, device) + else: + self._logger.debug( + 'Using a custom TLS certificate for manager %s, virtual ' + 'media device %s', identity, device) + auth = (username, password) if (username and password) else None + if custom_cert is not None: + custom_cert_file = tempfile.NamedTemporaryFile(mode='wt') + custom_cert_file.write(custom_cert) + custom_cert_file.flush() + verify_media_cert = custom_cert_file.name + try: with requests.get(image_url, stream=True, @@ -229,6 +295,9 @@ class StaticDriver(base.DriverBase): msg = 'Failed fetching image from URL %s: %s' % (image_url, ex) self._logger.exception(msg) raise error.FishyError(msg) + finally: + if custom_cert is not None: + custom_cert_file.close() self._logger.debug( 'Fetched image %(file)s for %(identity)s' % { diff --git a/sushy_tools/emulator/templates/certificate.json b/sushy_tools/emulator/templates/certificate.json new file mode 100755 index 00000000..7f98954c --- /dev/null +++ b/sushy_tools/emulator/templates/certificate.json @@ -0,0 +1,10 @@ +{ + "@odata.type": "#Certificate.v1_3_0.Certificate", + "Id": {{ cert_id|string|tojson }}, + "Name": "HTTPS Certificate {{ cert_id }}", + "CertificateString": {{ cert_string|string|tojson }}, + "CertificateType": {{ cert_type|string|tojson }}, + "Oem": {}, + "@odata.id": {{ location|string|tojson }}, + "@Redfish.Copyright": "Copyright 2014-2021 DMTF. For the full DMTF copyright policy, see http://www.dmtf.org/about/policies/copyright." +} diff --git a/sushy_tools/emulator/templates/certificate_service.json b/sushy_tools/emulator/templates/certificate_service.json new file mode 100755 index 00000000..baa160b7 --- /dev/null +++ b/sushy_tools/emulator/templates/certificate_service.json @@ -0,0 +1,14 @@ +{ + "@odata.type": "#CertificateService.v1_0_4.CertificateService", + "Id": "CertificateService", + "Name": "Certificate Service", + "Actions": { + "#CertificateService.ReplaceCertificate": { + "target": "/redfish/v1/CertificateService/Actions/CertificateService.ReplaceCertificate", + "@Redfish.ActionInfo": "/redfish/v1/CertificateService/ReplaceCertificateActionInfo" + } + }, + "Oem": {}, + "@odata.id": "/redfish/v1/CertificateService", + "@Redfish.Copyright": "Copyright 2014-2021 DMTF. For the full DMTF copyright policy, see http://www.dmtf.org/about/policies/copyright." +} diff --git a/sushy_tools/emulator/templates/root.json b/sushy_tools/emulator/templates/root.json index 7557b18a..a7e5cdc4 100644 --- a/sushy_tools/emulator/templates/root.json +++ b/sushy_tools/emulator/templates/root.json @@ -1,8 +1,8 @@ { - "@odata.type": "#ServiceRoot.v1_0_2.ServiceRoot", + "@odata.type": "#ServiceRoot.v1_5_0.ServiceRoot", "Id": "RedvirtService", "Name": "Redvirt Service", - "RedfishVersion": "1.0.2", + "RedfishVersion": "1.5.0", "UUID": "85775665-c110-4b85-8989-e6162170b3ec", "Systems": { "@odata.id": "/redfish/v1/Systems" @@ -13,6 +13,9 @@ "Registries": { "@odata.id": "/redfish/v1/Registries" }, + "CertificateService": { + "@odata.id": "/redfish/v1/CertificateService" + }, "@odata.id": "/redfish/v1/", "@Redfish.Copyright": "Copyright 2014-2016 Distributed Management Task Force, Inc. (DMTF). For the full DMTF copyright policy, see http://www.dmtf.org/about/policies/copyright." } diff --git a/sushy_tools/tests/unit/emulator/controllers/test_certificate_service.py b/sushy_tools/tests/unit/emulator/controllers/test_certificate_service.py new file mode 100644 index 00000000..d9e093e2 --- /dev/null +++ b/sushy_tools/tests/unit/emulator/controllers/test_certificate_service.py @@ -0,0 +1,104 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from sushy_tools import error +from sushy_tools.tests.unit.emulator import test_main + + +@test_main.patch_resource('vmedia') +@test_main.patch_resource('managers') +class CertificateServiceTestCase(test_main.EmulatorTestCase): + + def test_root(self, managers_mock, vmedia_mock): + response = self.app.get('redfish/v1/CertificateService') + + self.assertEqual(200, response.status_code) + self.assertIn('#CertificateService.ReplaceCertificate', + response.json['Actions']) + + def test_replace_ok(self, managers_mock, vmedia_mock): + response = self.app.post( + 'redfish/v1/CertificateService/Actions/' + 'CertificateService.ReplaceCertificate', + json={'CertificateString': 'abcd', + 'CertificateType': 'PEM', + 'CertificateUri': ('https://host/redfish/v1/Managers/1234' + '/VirtualMedia/CD/Certificates/1')}) + + self.assertEqual(204, response.status_code) + managers_mock.return_value.get_manager.assert_called_once_with('1234') + vmedia_mock.return_value.replace_certificate.assert_called_once_with( + '1234', 'CD', '1', 'abcd', 'PEM') + + def test_replace_manager_not_found(self, managers_mock, vmedia_mock): + managers_mock.return_value.get_manager.side_effect = error.NotFound + + response = self.app.post( + 'redfish/v1/CertificateService/Actions/' + 'CertificateService.ReplaceCertificate', + json={'CertificateString': 'abcd', + 'CertificateType': 'PEM', + 'CertificateUri': ('https://host/redfish/v1/Managers/1234' + '/VirtualMedia/CD/Certificates/1')}) + + self.assertEqual(404, response.status_code) + managers_mock.return_value.get_manager.assert_called_once_with('1234') + vmedia_mock.return_value.replace_certificate.assert_not_called() + + def test_replace_wrong_uri(self, managers_mock, vmedia_mock): + response = self.app.post( + 'redfish/v1/CertificateService/Actions/' + 'CertificateService.ReplaceCertificate', + json={'CertificateString': 'abcd', + 'CertificateType': 'PEM', + 'CertificateUri': ('https://host/redfish/v1/Managers/1234' + '/NetworkProtocol/HTTPS/Certificates/1')}) + + self.assertEqual(404, response.status_code) + managers_mock.return_value.get_manager.assert_not_called() + vmedia_mock.return_value.replace_certificate.assert_not_called() + + def test_replace_missing_string(self, managers_mock, vmedia_mock): + response = self.app.post( + 'redfish/v1/CertificateService/Actions/' + 'CertificateService.ReplaceCertificate', + json={'CertificateType': 'PEM', + 'CertificateUri': ('https://host/redfish/v1/Managers/1234' + '/VirtualMedia/CD/Certificates/1')}) + + self.assertEqual(400, response.status_code) + managers_mock.return_value.get_manager.assert_not_called() + vmedia_mock.return_value.replace_certificate.assert_not_called() + + def test_replace_wrong_type(self, managers_mock, vmedia_mock): + response = self.app.post( + 'redfish/v1/CertificateService/Actions/' + 'CertificateService.ReplaceCertificate', + json={'CertificateString': 'abcd', + 'CertificateType': 'non-PEM', + 'CertificateUri': ('https://host/redfish/v1/Managers/1234' + '/VirtualMedia/CD/Certificates/1')}) + + self.assertEqual(400, response.status_code) + managers_mock.return_value.get_manager.assert_not_called() + vmedia_mock.return_value.replace_certificate.assert_not_called() + + def test_replace_missing_uri(self, managers_mock, vmedia_mock): + response = self.app.post( + 'redfish/v1/CertificateService/Actions/' + 'CertificateService.ReplaceCertificate', + json={'CertificateString': 'abcd', + 'CertificateType': 'PEM'}) + + self.assertEqual(400, response.status_code) + managers_mock.return_value.get_manager.assert_not_called() + vmedia_mock.return_value.replace_certificate.assert_not_called() diff --git a/sushy_tools/tests/unit/emulator/controllers/test_virtual_media.py b/sushy_tools/tests/unit/emulator/controllers/test_virtual_media.py index 7d32e347..3fb3c044 100644 --- a/sushy_tools/tests/unit/emulator/controllers/test_virtual_media.py +++ b/sushy_tools/tests/unit/emulator/controllers/test_virtual_media.py @@ -158,11 +158,97 @@ class VirtualMediaTestCase(test_main.EmulatorTestCase): vmedia_mock.return_value.eject_image.called_once_with('CD') def test_virtual_media_certificates(self, managers_mock, vmedia_mock): + vmedia_mock.return_value.list_certificates.return_value = [ + vmedia.Certificate('1', 'PEM', 'abcd'), + vmedia.Certificate('2', 'PEM', 'dcba'), + ] response = self.app.get( - '/redfish/v1/Managers/%s/VirtualMedia/CD/Certificates' % self.uuid) + f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD/Certificates') self.assertEqual(200, response.status_code, response.json) - self.assertEqual(0, response.json['Members@odata.count']) - self.assertEqual([], response.json['Members']) + self.assertEqual(2, response.json['Members@odata.count']) + for index, member in enumerate(response.json['Members']): + self.assertTrue(member['@odata.id'].endswith( + f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD' + f'/Certificates/{index+1}'), member['@odata.id']) self.assertEqual(['PEM'], response.json['@Redfish.SupportedCertificates']) + + def test_virtual_media_certificates_manager_not_found(self, managers_mock, + vmedia_mock): + managers_mock.return_value.get_manager.side_effect = error.NotFound + response = self.app.get( + f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD/Certificates') + + self.assertEqual(404, response.status_code, response.json) + + def test_virtual_media_add_certificate(self, managers_mock, vmedia_mock): + vmedia_mock.return_value.add_certificate.return_value = \ + vmedia.Certificate('9', 'abcd', 'PEM') + + response = self.app.post( + f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD/Certificates', + json={'CertificateString': 'abcd', 'CertificateType': 'PEM'}) + + self.assertEqual(204, response.status_code, response.data) + self.assertIn( + f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD/Certificates/9', + response.headers['Location']) + + def test_virtual_media_add_certificate_no_string(self, managers_mock, + vmedia_mock): + response = self.app.post( + f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD/Certificates', + json={'CertificateType': 'PEM'}) + + self.assertEqual(400, response.status_code, response.data) + + def test_virtual_media_add_certificate_bad_type(self, managers_mock, + vmedia_mock): + response = self.app.post( + f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD/Certificates', + json={'CertificateString': 'abcd', 'CertificateType': 'non-PEM'}) + + self.assertEqual(400, response.status_code, response.data) + + def test_virtual_media_get_certificate(self, managers_mock, vmedia_mock): + vmedia_mock.return_value.list_certificates.return_value = [ + vmedia.Certificate('1', 'abcd', 'PEM'), + vmedia.Certificate('2', 'dcba', 'PEM'), + ] + response = self.app.get( + f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD/Certificates/2') + + self.assertEqual(200, response.status_code, response.json) + self.assertIn('Id', response.json) + self.assertEqual('2', response.json['Id']) + self.assertEqual('dcba', response.json['CertificateString']) + self.assertEqual('PEM', response.json['CertificateType']) + + def test_virtual_media_get_certificate_manager_not_found(self, + managers_mock, + vmedia_mock): + managers_mock.return_value.get_manager.side_effect = error.NotFound + response = self.app.get( + f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD/Certificates/2') + + self.assertEqual(404, response.status_code, response.json) + + def test_virtual_media_delete_certificate(self, managers_mock, + vmedia_mock): + response = self.app.delete( + f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD/Certificates/2') + + self.assertEqual(204, response.status_code, response.data) + vmedia_mock.return_value.delete_certificate.assert_called_once_with( + self.uuid, 'CD', '2') + + def test_virtual_media_delete_certificate_manager_not_found(self, + managers_mock, + vmedia_mock): + managers_mock.return_value.get_manager.side_effect = error.NotFound + response = self.app.delete( + f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD/Certificates/2') + + self.assertEqual(404, response.status_code, response.json) + vmedia_mock.return_value.delete_certificate.assert_not_called() diff --git a/sushy_tools/tests/unit/emulator/resources/test_vmedia.py b/sushy_tools/tests/unit/emulator/resources/test_vmedia.py index 23c716ad..6510366d 100644 --- a/sushy_tools/tests/unit/emulator/resources/test_vmedia.py +++ b/sushy_tools/tests/unit/emulator/resources/test_vmedia.py @@ -313,6 +313,48 @@ class StaticDriverTestCase(base.BaseTestCase): self.assertFalse(device_info['WriteProtected']) self.assertEqual(local_file, device_info['_local_file']) + @mock.patch.object(vmedia.StaticDriver, '_get_device', autospec=True) + @mock.patch.object(builtins, 'open', autospec=True) + @mock.patch.object(vmedia.os, 'rename', autospec=True) + @mock.patch.object(vmedia, 'tempfile', autospec=True) + @mock.patch.object(vmedia, 'requests', autospec=True) + def test_insert_image_verify_ssl_custom(self, mock_requests, + mock_tempfile, + mock_rename, mock_open, + mock_get_device): + device_info = {'Verify': True, + 'Certificate': {'String': 'abcd'}} + mock_get_device.return_value = device_info + + mock_tempfile.mkdtemp.return_value = '/alphabet/soup' + mock_tempfile.gettempdir.return_value = '/tmp' + mock_tmp_file = (mock_tempfile.NamedTemporaryFile + .return_value.__enter__.return_value) + mock_tmp_file.name = 'alphabet.soup' + mock_rsp = mock_requests.get.return_value.__enter__.return_value + mock_rsp.headers = { + 'content-disposition': 'attachment; filename="fish.iso"' + } + mock_rsp.status_code = 200 + + local_file = self.test_driver.insert_image( + self.UUID, 'Cd', 'https://fish.it/red.iso', inserted=True, + write_protected=False) + + self.assertEqual('/alphabet/soup/fish.iso', local_file) + mock_requests.get.assert_called_once_with( + 'https://fish.it/red.iso', stream=True, + verify=mock_tempfile.NamedTemporaryFile.return_value.name, + auth=None) + mock_open.assert_called_once_with(mock.ANY, 'wb') + mock_rename.assert_called_once_with( + 'alphabet.soup', '/alphabet/soup/fish.iso') + + self.assertEqual('fish.iso', device_info['Image']) + self.assertTrue(device_info['Inserted']) + self.assertFalse(device_info['WriteProtected']) + self.assertEqual(local_file, device_info['_local_file']) + @mock.patch.object(vmedia.StaticDriver, '_get_device', autospec=True) @mock.patch.object(builtins, 'open', autospec=True) @mock.patch.object(vmedia.os, 'rename', autospec=True) @@ -359,3 +401,90 @@ class StaticDriverTestCase(base.BaseTestCase): self.assertFalse(device_info['WriteProtected']) mock_unlink.assert_called_once_with('/tmp/fish.iso') + + @mock.patch.object(vmedia.StaticDriver, '_get_device', autospec=True) + def test_list_certificates(self, mock_get_device): + mock_get_device.return_value = { + 'Certificate': {'Type': 'PEM', 'String': 'abcd'}, + } + result = self.test_driver.list_certificates(self.UUID, 'Cd') + self.assertEqual([vmedia.Certificate('Default', 'abcd', 'PEM')], + result) + + def test_list_certificates_empty(self): + result = self.test_driver.list_certificates(self.UUID, 'Cd') + self.assertEqual([], result) + + @mock.patch.object(vmedia.StaticDriver, '_get_device', autospec=True) + def test_add_certificate(self, mock_get_device): + device_info = {} + mock_get_device.return_value = device_info + + result = self.test_driver.add_certificate(self.UUID, 'Cd', + 'abcd', 'PEM') + self.assertEqual(vmedia.Certificate('Default', 'abcd', 'PEM'), result) + self.assertEqual({'Certificate': {'Type': 'PEM', 'String': 'abcd'}}, + device_info) + + @mock.patch.object(vmedia.StaticDriver, '_get_device', autospec=True) + def test_add_certificate_exists(self, mock_get_device): + device_info = { + 'Certificate': {'Type': 'PEM', 'String': 'abcd'}, + } + mock_get_device.return_value = device_info + + self.assertRaises(error.FishyError, + self.test_driver.add_certificate, + self.UUID, 'Cd', 'defg', 'PEM') + + @mock.patch.object(vmedia.StaticDriver, '_get_device', autospec=True) + def test_replace_certificate(self, mock_get_device): + device_info = { + 'Certificate': {'Type': 'PEM', 'String': 'abcd'}, + } + mock_get_device.return_value = device_info + + result = self.test_driver.replace_certificate(self.UUID, 'Cd', + 'Default', 'defg', 'PEM') + self.assertEqual(vmedia.Certificate('Default', 'defg', 'PEM'), result) + self.assertEqual({'Certificate': {'Type': 'PEM', 'String': 'defg'}}, + device_info) + + @mock.patch.object(vmedia.StaticDriver, '_get_device', autospec=True) + def test_replace_certificate_wrong_id(self, mock_get_device): + device_info = { + 'Certificate': {'Type': 'PEM', 'String': 'abcd'}, + } + mock_get_device.return_value = device_info + + self.assertRaises(error.NotFound, + self.test_driver.replace_certificate, + self.UUID, 'Cd', 'Other', 'defg', 'PEM') + + @mock.patch.object(vmedia.StaticDriver, '_get_device', autospec=True) + def test_replace_certificate_not_found(self, mock_get_device): + device_info = {} + mock_get_device.return_value = device_info + + self.assertRaises(error.NotFound, + self.test_driver.replace_certificate, + self.UUID, 'Cd', 'Default', 'defg', 'PEM') + + @mock.patch.object(vmedia.StaticDriver, '_get_device', autospec=True) + def test_delete_certificate(self, mock_get_device): + device_info = { + 'Certificate': {'Type': 'PEM', 'String': 'abcd'}, + } + mock_get_device.return_value = device_info + + self.test_driver.delete_certificate(self.UUID, 'Cd', 'Default') + self.assertEqual({}, device_info) + + @mock.patch.object(vmedia.StaticDriver, '_get_device', autospec=True) + def test_delete_certificate_not_found(self, mock_get_device): + device_info = {} + mock_get_device.return_value = device_info + + self.assertRaises(error.NotFound, + self.test_driver.delete_certificate, + self.UUID, 'Cd', 'Default')