Support uploading certificates for virtual media

This change implements the new feature of VirtualMedia 1.4.0: custom TLS
certificates. Adding/listing/deleting/replacing certificates is supported,
but creating a CSR is not.

Change-Id: I30a5c673ab91486cafbe1383c5fe58142102a7c2
This commit is contained in:
Dmitry Tantsur 2021-08-26 19:18:44 +02:00
parent c37ffb959d
commit 0051bd4be0
12 changed files with 557 additions and 12 deletions

View File

@ -0,0 +1,4 @@
---
features:
- |
Adds basic support for custom TLS certificates with virtual media.

View File

@ -66,10 +66,11 @@ def returns_json(decorated_func):
if isinstance(response, flask.Response):
return response
if isinstance(response, tuple):
contents, status = response
contents, status, *headers = response
else:
contents, status = response, 200
contents, status, headers = response, 200, ()
kwargs = {'headers': headers[0]} if headers else {}
return flask.Response(response=contents, status=status,
content_type='application/json')
content_type='application/json', **kwargs)
return decorator

View File

@ -0,0 +1,68 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import flask
from sushy_tools.emulator import api_utils
from sushy_tools import error
certificate_service = flask.Blueprint(
'CertificateService', __name__,
url_prefix='/redfish/v1/CertificateService')
_VMEDIA_URI_PATTERN = re.compile(
'/redfish/v1/Managers/([^/]+)/VirtualMedia/([^/]+)/Certificates/([^/]+)/?$'
)
@certificate_service.route('/', methods=['GET'])
@api_utils.returns_json
def certificate_service_resource():
api_utils.debug('Serving certificate service')
return flask.render_template('certificate_service.json')
@certificate_service.route('/Actions/CertificateService.ReplaceCertificate',
methods=['POST'])
@api_utils.ensure_instance_access
@api_utils.returns_json
def certificate_service_replace_certificate():
if not flask.request.json:
raise error.BadRequest("Empty or malformed certificate")
try:
cert_string = flask.request.json['CertificateString']
cert_type = flask.request.json['CertificateType']
cert_uri = flask.request.json['CertificateUri']
except KeyError as exc:
raise error.BadRequest(f"Missing required parameter {exc}")
match = _VMEDIA_URI_PATTERN.search(cert_uri)
if not match:
raise error.NotFound(
f"Certificates at URI {cert_uri} are not supported")
if cert_type != 'PEM':
raise error.BadRequest(
f"Only PEM certificates are supported, got {cert_type}")
manager_id, device, cert_id = match.groups()
flask.current_app.managers.get_manager(manager_id)
flask.current_app.vmedia.replace_certificate(
manager_id, device, cert_id, cert_string, cert_type)
return '', 204

View File

@ -69,6 +69,7 @@ def virtual_media_resource(identity, device):
@virtual_media.route('/<device>', methods=['PATCH'])
@api_utils.returns_json
def virtual_media_patch(identity, device):
flask.current_app.managers.get_manager(identity)
if not flask.request.json:
raise error.BadRequest("Empty or malformed patch")
@ -90,24 +91,78 @@ def virtual_media_patch(identity, device):
@virtual_media.route('/<device>/Certificates', methods=['GET'])
@api_utils.returns_json
def virtual_media_certificates(identity, device):
flask.current_app.managers.get_manager(identity)
location = \
f'/redfish/v1/Managers/{identity}/VirtualMedia/{device}/Certificates'
certificates = flask.current_app.vmedia.list_certificates(identity, device)
return flask.render_template(
'certificate_collection.json',
location=location,
# TODO(dtantsur): implement
certificates=[],
certificates=[cert.id for cert in certificates],
)
@virtual_media.route('/<device>/Certificates', methods=['POST'])
@api_utils.returns_json
def virtual_media_add_certificate(identity, device):
flask.current_app.managers.get_manager(identity)
if not flask.request.json:
raise error.BadRequest("Empty or malformed certificate")
# TODO(dtantsur): implement
raise error.NotSupportedError("Not implemented")
try:
cert_string = flask.request.json['CertificateString']
cert_type = flask.request.json['CertificateType']
except KeyError as exc:
raise error.BadRequest(f"Missing required parameter {exc}")
if cert_type != 'PEM':
raise error.BadRequest(
f"Only PEM certificates are supported, got {cert_type}")
api_utils.debug('Adding certificate for virtual media %s at '
'manager "%s"', device, identity)
cert = flask.current_app.vmedia.add_certificate(
identity, device, cert_string, cert_type)
location = (
f'/redfish/v1/Managers/{identity}/VirtualMedia/{device}'
f'/Certificates/{cert.id}'
)
return '', 204, {'Location': location}
@virtual_media.route('/<device>/Certificates/<cert_id>', methods=['GET'])
@api_utils.returns_json
def virtual_media_get_certificate(identity, device, cert_id):
flask.current_app.managers.get_manager(identity)
location = (
f'/redfish/v1/Managers/{identity}/VirtualMedia/{device}'
f'/Certificates/{cert_id}'
)
certificates = flask.current_app.vmedia.list_certificates(identity, device)
try:
cert = next(c for c in certificates if c.id == cert_id)
except StopIteration:
raise error.NotFound()
return flask.render_template(
'certificate.json',
location=location,
cert_id=cert.id,
cert_string=cert.string,
cert_type=cert.type_,
)
@virtual_media.route('/<device>/Certificates/<cert_id>', methods=['DELETE'])
@api_utils.returns_json
def virtual_media_delete_certificate(identity, device, cert_id):
flask.current_app.managers.get_manager(identity)
api_utils.debug('Removing certificate %s for virtual media %s at '
'manager "%s"', cert_id, device, identity)
flask.current_app.vmedia.delete_certificate(identity, device, cert_id)
return '', 204
@virtual_media.route('/<device>/Actions/VirtualMedia.InsertMedia',

View File

@ -25,6 +25,7 @@ from ironic_lib import auth_basic
from werkzeug import exceptions as wz_exc
from sushy_tools.emulator import api_utils
from sushy_tools.emulator.controllers import certificate_service as certctl
from sushy_tools.emulator.controllers import virtual_media as vmctl
from sushy_tools.emulator import memoize
from sushy_tools.emulator.resources import chassis as chsdriver
@ -153,6 +154,7 @@ class Application(flask.Flask):
app = Application()
app.register_blueprint(certctl.certificate_service)
app.register_blueprint(vmctl.virtual_media)

View File

@ -30,6 +30,11 @@ DeviceInfo = collections.namedtuple(
'DeviceInfo',
['image_name', 'image_url', 'inserted', 'write_protected',
'username', 'password', 'verify'])
Certificate = collections.namedtuple(
'Certificate',
['id', 'string', 'type_'])
_CERT_ID = "Default"
class StaticDriver(base.DriverBase):
@ -150,6 +155,47 @@ class StaticDriver(base.DriverBase):
device_info['Verify'] = verify
self._devices[(identity, device)] = device_info
def add_certificate(self, identity, device, cert_string, cert_type):
device_info = self._get_device(identity, device)
if "Certificate" in device_info:
raise error.FishyError("Virtual media certificate already exists",
code=409)
device_info["Certificate"] = {'Type': cert_type, 'String': cert_string}
self._devices[(identity, device)] = device_info
return Certificate(_CERT_ID, cert_string, cert_type)
def replace_certificate(self, identity, device, cert_id,
cert_string, cert_type):
device_info = self._get_device(identity, device)
if cert_id != _CERT_ID or "Certificate" not in device_info:
raise error.NotFound(f"Certificate {cert_id} not found")
device_info["Certificate"] = {'Type': cert_type, 'String': cert_string}
self._devices[(identity, device)] = device_info
return Certificate(_CERT_ID, cert_string, cert_type)
def list_certificates(self, identity, device):
device_info = self._get_device(identity, device)
try:
certificate = device_info["Certificate"]
except KeyError:
return []
return [Certificate(_CERT_ID, certificate['String'],
certificate['Type'])]
def delete_certificate(self, identity, device, cert_id):
device_info = self._get_device(identity, device)
if cert_id != _CERT_ID or "Certificate" not in device_info:
raise error.NotFound(f"Certificate {cert_id} not found")
del device_info["Certificate"]
self._devices[(identity, device)] = device_info
def _write_from_response(self, image_url, rsp, tmp_file):
with open(tmp_file.name, 'wb') as fl:
for chunk in rsp.iter_content(chunk_size=8192):
@ -193,8 +239,28 @@ class StaticDriver(base.DriverBase):
# NOTE(dtantsur): it's de facto standard for Redfish to default
# to no certificate validation.
self._config.get('SUSHY_EMULATOR_VMEDIA_VERIFY_SSL', False))
custom_cert = None
if verify_media_cert:
try:
custom_cert = device_info['Certificate']['String']
except KeyError:
self._logger.debug(
'TLS verification is enabled but not custom certificate '
'is provided, using built-in CA for manager %s, virtual '
'media device %s', identity, device)
else:
self._logger.debug(
'Using a custom TLS certificate for manager %s, virtual '
'media device %s', identity, device)
auth = (username, password) if (username and password) else None
if custom_cert is not None:
custom_cert_file = tempfile.NamedTemporaryFile(mode='wt')
custom_cert_file.write(custom_cert)
custom_cert_file.flush()
verify_media_cert = custom_cert_file.name
try:
with requests.get(image_url,
stream=True,
@ -229,6 +295,9 @@ class StaticDriver(base.DriverBase):
msg = 'Failed fetching image from URL %s: %s' % (image_url, ex)
self._logger.exception(msg)
raise error.FishyError(msg)
finally:
if custom_cert is not None:
custom_cert_file.close()
self._logger.debug(
'Fetched image %(file)s for %(identity)s' % {

View File

@ -0,0 +1,10 @@
{
"@odata.type": "#Certificate.v1_3_0.Certificate",
"Id": {{ cert_id|string|tojson }},
"Name": "HTTPS Certificate {{ cert_id }}",
"CertificateString": {{ cert_string|string|tojson }},
"CertificateType": {{ cert_type|string|tojson }},
"Oem": {},
"@odata.id": {{ location|string|tojson }},
"@Redfish.Copyright": "Copyright 2014-2021 DMTF. For the full DMTF copyright policy, see http://www.dmtf.org/about/policies/copyright."
}

View File

@ -0,0 +1,14 @@
{
"@odata.type": "#CertificateService.v1_0_4.CertificateService",
"Id": "CertificateService",
"Name": "Certificate Service",
"Actions": {
"#CertificateService.ReplaceCertificate": {
"target": "/redfish/v1/CertificateService/Actions/CertificateService.ReplaceCertificate",
"@Redfish.ActionInfo": "/redfish/v1/CertificateService/ReplaceCertificateActionInfo"
}
},
"Oem": {},
"@odata.id": "/redfish/v1/CertificateService",
"@Redfish.Copyright": "Copyright 2014-2021 DMTF. For the full DMTF copyright policy, see http://www.dmtf.org/about/policies/copyright."
}

View File

@ -1,8 +1,8 @@
{
"@odata.type": "#ServiceRoot.v1_0_2.ServiceRoot",
"@odata.type": "#ServiceRoot.v1_5_0.ServiceRoot",
"Id": "RedvirtService",
"Name": "Redvirt Service",
"RedfishVersion": "1.0.2",
"RedfishVersion": "1.5.0",
"UUID": "85775665-c110-4b85-8989-e6162170b3ec",
"Systems": {
"@odata.id": "/redfish/v1/Systems"
@ -13,6 +13,9 @@
"Registries": {
"@odata.id": "/redfish/v1/Registries"
},
"CertificateService": {
"@odata.id": "/redfish/v1/CertificateService"
},
"@odata.id": "/redfish/v1/",
"@Redfish.Copyright": "Copyright 2014-2016 Distributed Management Task Force, Inc. (DMTF). For the full DMTF copyright policy, see http://www.dmtf.org/about/policies/copyright."
}

View File

@ -0,0 +1,104 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sushy_tools import error
from sushy_tools.tests.unit.emulator import test_main
@test_main.patch_resource('vmedia')
@test_main.patch_resource('managers')
class CertificateServiceTestCase(test_main.EmulatorTestCase):
def test_root(self, managers_mock, vmedia_mock):
response = self.app.get('redfish/v1/CertificateService')
self.assertEqual(200, response.status_code)
self.assertIn('#CertificateService.ReplaceCertificate',
response.json['Actions'])
def test_replace_ok(self, managers_mock, vmedia_mock):
response = self.app.post(
'redfish/v1/CertificateService/Actions/'
'CertificateService.ReplaceCertificate',
json={'CertificateString': 'abcd',
'CertificateType': 'PEM',
'CertificateUri': ('https://host/redfish/v1/Managers/1234'
'/VirtualMedia/CD/Certificates/1')})
self.assertEqual(204, response.status_code)
managers_mock.return_value.get_manager.assert_called_once_with('1234')
vmedia_mock.return_value.replace_certificate.assert_called_once_with(
'1234', 'CD', '1', 'abcd', 'PEM')
def test_replace_manager_not_found(self, managers_mock, vmedia_mock):
managers_mock.return_value.get_manager.side_effect = error.NotFound
response = self.app.post(
'redfish/v1/CertificateService/Actions/'
'CertificateService.ReplaceCertificate',
json={'CertificateString': 'abcd',
'CertificateType': 'PEM',
'CertificateUri': ('https://host/redfish/v1/Managers/1234'
'/VirtualMedia/CD/Certificates/1')})
self.assertEqual(404, response.status_code)
managers_mock.return_value.get_manager.assert_called_once_with('1234')
vmedia_mock.return_value.replace_certificate.assert_not_called()
def test_replace_wrong_uri(self, managers_mock, vmedia_mock):
response = self.app.post(
'redfish/v1/CertificateService/Actions/'
'CertificateService.ReplaceCertificate',
json={'CertificateString': 'abcd',
'CertificateType': 'PEM',
'CertificateUri': ('https://host/redfish/v1/Managers/1234'
'/NetworkProtocol/HTTPS/Certificates/1')})
self.assertEqual(404, response.status_code)
managers_mock.return_value.get_manager.assert_not_called()
vmedia_mock.return_value.replace_certificate.assert_not_called()
def test_replace_missing_string(self, managers_mock, vmedia_mock):
response = self.app.post(
'redfish/v1/CertificateService/Actions/'
'CertificateService.ReplaceCertificate',
json={'CertificateType': 'PEM',
'CertificateUri': ('https://host/redfish/v1/Managers/1234'
'/VirtualMedia/CD/Certificates/1')})
self.assertEqual(400, response.status_code)
managers_mock.return_value.get_manager.assert_not_called()
vmedia_mock.return_value.replace_certificate.assert_not_called()
def test_replace_wrong_type(self, managers_mock, vmedia_mock):
response = self.app.post(
'redfish/v1/CertificateService/Actions/'
'CertificateService.ReplaceCertificate',
json={'CertificateString': 'abcd',
'CertificateType': 'non-PEM',
'CertificateUri': ('https://host/redfish/v1/Managers/1234'
'/VirtualMedia/CD/Certificates/1')})
self.assertEqual(400, response.status_code)
managers_mock.return_value.get_manager.assert_not_called()
vmedia_mock.return_value.replace_certificate.assert_not_called()
def test_replace_missing_uri(self, managers_mock, vmedia_mock):
response = self.app.post(
'redfish/v1/CertificateService/Actions/'
'CertificateService.ReplaceCertificate',
json={'CertificateString': 'abcd',
'CertificateType': 'PEM'})
self.assertEqual(400, response.status_code)
managers_mock.return_value.get_manager.assert_not_called()
vmedia_mock.return_value.replace_certificate.assert_not_called()

View File

@ -158,11 +158,97 @@ class VirtualMediaTestCase(test_main.EmulatorTestCase):
vmedia_mock.return_value.eject_image.called_once_with('CD')
def test_virtual_media_certificates(self, managers_mock, vmedia_mock):
vmedia_mock.return_value.list_certificates.return_value = [
vmedia.Certificate('1', 'PEM', 'abcd'),
vmedia.Certificate('2', 'PEM', 'dcba'),
]
response = self.app.get(
'/redfish/v1/Managers/%s/VirtualMedia/CD/Certificates' % self.uuid)
f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD/Certificates')
self.assertEqual(200, response.status_code, response.json)
self.assertEqual(0, response.json['Members@odata.count'])
self.assertEqual([], response.json['Members'])
self.assertEqual(2, response.json['Members@odata.count'])
for index, member in enumerate(response.json['Members']):
self.assertTrue(member['@odata.id'].endswith(
f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD'
f'/Certificates/{index+1}'), member['@odata.id'])
self.assertEqual(['PEM'],
response.json['@Redfish.SupportedCertificates'])
def test_virtual_media_certificates_manager_not_found(self, managers_mock,
vmedia_mock):
managers_mock.return_value.get_manager.side_effect = error.NotFound
response = self.app.get(
f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD/Certificates')
self.assertEqual(404, response.status_code, response.json)
def test_virtual_media_add_certificate(self, managers_mock, vmedia_mock):
vmedia_mock.return_value.add_certificate.return_value = \
vmedia.Certificate('9', 'abcd', 'PEM')
response = self.app.post(
f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD/Certificates',
json={'CertificateString': 'abcd', 'CertificateType': 'PEM'})
self.assertEqual(204, response.status_code, response.data)
self.assertIn(
f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD/Certificates/9',
response.headers['Location'])
def test_virtual_media_add_certificate_no_string(self, managers_mock,
vmedia_mock):
response = self.app.post(
f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD/Certificates',
json={'CertificateType': 'PEM'})
self.assertEqual(400, response.status_code, response.data)
def test_virtual_media_add_certificate_bad_type(self, managers_mock,
vmedia_mock):
response = self.app.post(
f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD/Certificates',
json={'CertificateString': 'abcd', 'CertificateType': 'non-PEM'})
self.assertEqual(400, response.status_code, response.data)
def test_virtual_media_get_certificate(self, managers_mock, vmedia_mock):
vmedia_mock.return_value.list_certificates.return_value = [
vmedia.Certificate('1', 'abcd', 'PEM'),
vmedia.Certificate('2', 'dcba', 'PEM'),
]
response = self.app.get(
f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD/Certificates/2')
self.assertEqual(200, response.status_code, response.json)
self.assertIn('Id', response.json)
self.assertEqual('2', response.json['Id'])
self.assertEqual('dcba', response.json['CertificateString'])
self.assertEqual('PEM', response.json['CertificateType'])
def test_virtual_media_get_certificate_manager_not_found(self,
managers_mock,
vmedia_mock):
managers_mock.return_value.get_manager.side_effect = error.NotFound
response = self.app.get(
f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD/Certificates/2')
self.assertEqual(404, response.status_code, response.json)
def test_virtual_media_delete_certificate(self, managers_mock,
vmedia_mock):
response = self.app.delete(
f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD/Certificates/2')
self.assertEqual(204, response.status_code, response.data)
vmedia_mock.return_value.delete_certificate.assert_called_once_with(
self.uuid, 'CD', '2')
def test_virtual_media_delete_certificate_manager_not_found(self,
managers_mock,
vmedia_mock):
managers_mock.return_value.get_manager.side_effect = error.NotFound
response = self.app.delete(
f'/redfish/v1/Managers/{self.uuid}/VirtualMedia/CD/Certificates/2')
self.assertEqual(404, response.status_code, response.json)
vmedia_mock.return_value.delete_certificate.assert_not_called()

View File

@ -313,6 +313,48 @@ class StaticDriverTestCase(base.BaseTestCase):
self.assertFalse(device_info['WriteProtected'])
self.assertEqual(local_file, device_info['_local_file'])
@mock.patch.object(vmedia.StaticDriver, '_get_device', autospec=True)
@mock.patch.object(builtins, 'open', autospec=True)
@mock.patch.object(vmedia.os, 'rename', autospec=True)
@mock.patch.object(vmedia, 'tempfile', autospec=True)
@mock.patch.object(vmedia, 'requests', autospec=True)
def test_insert_image_verify_ssl_custom(self, mock_requests,
mock_tempfile,
mock_rename, mock_open,
mock_get_device):
device_info = {'Verify': True,
'Certificate': {'String': 'abcd'}}
mock_get_device.return_value = device_info
mock_tempfile.mkdtemp.return_value = '/alphabet/soup'
mock_tempfile.gettempdir.return_value = '/tmp'
mock_tmp_file = (mock_tempfile.NamedTemporaryFile
.return_value.__enter__.return_value)
mock_tmp_file.name = 'alphabet.soup'
mock_rsp = mock_requests.get.return_value.__enter__.return_value
mock_rsp.headers = {
'content-disposition': 'attachment; filename="fish.iso"'
}
mock_rsp.status_code = 200
local_file = self.test_driver.insert_image(
self.UUID, 'Cd', 'https://fish.it/red.iso', inserted=True,
write_protected=False)
self.assertEqual('/alphabet/soup/fish.iso', local_file)
mock_requests.get.assert_called_once_with(
'https://fish.it/red.iso', stream=True,
verify=mock_tempfile.NamedTemporaryFile.return_value.name,
auth=None)
mock_open.assert_called_once_with(mock.ANY, 'wb')
mock_rename.assert_called_once_with(
'alphabet.soup', '/alphabet/soup/fish.iso')
self.assertEqual('fish.iso', device_info['Image'])
self.assertTrue(device_info['Inserted'])
self.assertFalse(device_info['WriteProtected'])
self.assertEqual(local_file, device_info['_local_file'])
@mock.patch.object(vmedia.StaticDriver, '_get_device', autospec=True)
@mock.patch.object(builtins, 'open', autospec=True)
@mock.patch.object(vmedia.os, 'rename', autospec=True)
@ -359,3 +401,90 @@ class StaticDriverTestCase(base.BaseTestCase):
self.assertFalse(device_info['WriteProtected'])
mock_unlink.assert_called_once_with('/tmp/fish.iso')
@mock.patch.object(vmedia.StaticDriver, '_get_device', autospec=True)
def test_list_certificates(self, mock_get_device):
mock_get_device.return_value = {
'Certificate': {'Type': 'PEM', 'String': 'abcd'},
}
result = self.test_driver.list_certificates(self.UUID, 'Cd')
self.assertEqual([vmedia.Certificate('Default', 'abcd', 'PEM')],
result)
def test_list_certificates_empty(self):
result = self.test_driver.list_certificates(self.UUID, 'Cd')
self.assertEqual([], result)
@mock.patch.object(vmedia.StaticDriver, '_get_device', autospec=True)
def test_add_certificate(self, mock_get_device):
device_info = {}
mock_get_device.return_value = device_info
result = self.test_driver.add_certificate(self.UUID, 'Cd',
'abcd', 'PEM')
self.assertEqual(vmedia.Certificate('Default', 'abcd', 'PEM'), result)
self.assertEqual({'Certificate': {'Type': 'PEM', 'String': 'abcd'}},
device_info)
@mock.patch.object(vmedia.StaticDriver, '_get_device', autospec=True)
def test_add_certificate_exists(self, mock_get_device):
device_info = {
'Certificate': {'Type': 'PEM', 'String': 'abcd'},
}
mock_get_device.return_value = device_info
self.assertRaises(error.FishyError,
self.test_driver.add_certificate,
self.UUID, 'Cd', 'defg', 'PEM')
@mock.patch.object(vmedia.StaticDriver, '_get_device', autospec=True)
def test_replace_certificate(self, mock_get_device):
device_info = {
'Certificate': {'Type': 'PEM', 'String': 'abcd'},
}
mock_get_device.return_value = device_info
result = self.test_driver.replace_certificate(self.UUID, 'Cd',
'Default', 'defg', 'PEM')
self.assertEqual(vmedia.Certificate('Default', 'defg', 'PEM'), result)
self.assertEqual({'Certificate': {'Type': 'PEM', 'String': 'defg'}},
device_info)
@mock.patch.object(vmedia.StaticDriver, '_get_device', autospec=True)
def test_replace_certificate_wrong_id(self, mock_get_device):
device_info = {
'Certificate': {'Type': 'PEM', 'String': 'abcd'},
}
mock_get_device.return_value = device_info
self.assertRaises(error.NotFound,
self.test_driver.replace_certificate,
self.UUID, 'Cd', 'Other', 'defg', 'PEM')
@mock.patch.object(vmedia.StaticDriver, '_get_device', autospec=True)
def test_replace_certificate_not_found(self, mock_get_device):
device_info = {}
mock_get_device.return_value = device_info
self.assertRaises(error.NotFound,
self.test_driver.replace_certificate,
self.UUID, 'Cd', 'Default', 'defg', 'PEM')
@mock.patch.object(vmedia.StaticDriver, '_get_device', autospec=True)
def test_delete_certificate(self, mock_get_device):
device_info = {
'Certificate': {'Type': 'PEM', 'String': 'abcd'},
}
mock_get_device.return_value = device_info
self.test_driver.delete_certificate(self.UUID, 'Cd', 'Default')
self.assertEqual({}, device_info)
@mock.patch.object(vmedia.StaticDriver, '_get_device', autospec=True)
def test_delete_certificate_not_found(self, mock_get_device):
device_info = {}
mock_get_device.return_value = device_info
self.assertRaises(error.NotFound,
self.test_driver.delete_certificate,
self.UUID, 'Cd', 'Default')