Remove TPM mode support from certificate commands

The TPM mode certificate functionality is not being used currently and
it is not supported for any of the other 10+ certificates on the
platform. Another reason for removal is that it would be challenging to
add this support with the same implementation style.

This work removes the support for TPM mode certificates on the `system
certificate-*` commands.

Test Plan:

PASS: Verify `system certificate-install` command don't accept TPM mode
certificate
PASS: Verify `system help certificate-(install|show)` commands don't
mention TPM mode

Regression:

PASS: Verify system bootstrap/installation finishes correctly with the
changes
PASS: Verify other kinds of certificates (docker_registry, openstack,
openstack_ca, ssl_ca and ssl) can be
- Installed
- Listed
- Showed
- Uninstalled (ssl_ca only)

Story: 2009712
Task: 44086
Signed-off-by: Rafael Camargos <RafaelLucas.Camargos@windriver.com>
Change-Id: I6b3895d7d3dbd929b601413de69037534852d1c4
This commit is contained in:
Rafael Camargos 2021-11-29 10:29:42 -03:00
parent f5d836d161
commit 407d47ed33
8 changed files with 8 additions and 246 deletions

View File

@ -36,20 +36,9 @@ def _print_certificate_show(certificate):
@utils.arg('certificate_uuid', metavar='<certificate_uuid>',
help="UUID of certificate or the reserved word 'tpm' "
"to show TPM certificate")
help="UUID of certificate")
def do_certificate_show(cc, args):
"""Show Certificate details."""
if args.certificate_uuid == 'tpm':
certificates = cc.certificate.list()
for cert in certificates:
if cert.certtype == 'tpm_mode':
args.certificate_uuid = cert.uuid
break
else:
print("No TPM certificate installed")
return
certificate = cc.certificate.get(args.certificate_uuid)
if certificate:
_print_certificate_show(certificate)
@ -76,8 +65,8 @@ def do_certificate_list(cc, args):
help='The passphrase for the PEM file')
@utils.arg('-m', '--mode',
metavar='<mode>',
help="optional mode: 'tpm_mode',"
"'docker_registry, 'openstack', 'openstack_ca', 'ssl_ca'. "
help="optional mode: 'docker_registry', "
"'openstack', 'openstack_ca', 'ssl_ca'. "
"Default is 'ssl'.")
def do_certificate_install(cc, args):
"""Install certificate."""

View File

@ -160,7 +160,7 @@ def do_modify(cc, args):
if print_https_warning:
certificates = cc.certificate.list()
for certificate in certificates:
if certificate.certtype in ['ssl', 'tpm_mode']:
if certificate.certtype == 'ssl':
warning = ("Existing certificate %s is used for https."
% certificate.uuid)
break

View File

@ -128,9 +128,6 @@ class Certificate(base.APIBase):
'start_date',
'expiry_date'])
# insert details for this certificate if they exist
certificate = _insert_certificate_details(certificate)
certificate.links = \
[link.Link.make_link('self', pecan.request.host_url,
'certificates', certificate.uuid),
@ -173,33 +170,6 @@ def _check_certificate_data(certificate):
return certificate
def _insert_certificate_details(certificate):
if not certificate:
return
if certificate.certtype == constants.CERT_MODE_TPM:
try:
tpmconfig = pecan.request.dbapi.tpmconfig_get_one()
except exception.NotFound:
return certificate
tpmdevices = pecan.request.dbapi.tpmdevice_get_list()
certificate.details = {}
states = {}
for device in tpmdevices:
# extract the state info per host
ihost = pecan.request.dbapi.ihost_get(device['host_id'])
if ihost:
states[ihost.hostname] = device.state
if tpmdevices:
certificate.details['state'] = states
if tpmconfig.updated_at:
certificate.details['updated_at'] = \
tpmconfig.updated_at.isoformat()
return certificate
LOCK_NAME = 'CertificateController'
@ -283,7 +253,6 @@ class CertificateController(rest.RestController):
Certificates are installed according to one of the following modes:
default: install certificate for ssl
tpm_mode: install certificate to tpm devices for ssl
docker_registry: install certificate for docker registry
openstack: install certificate for openstack
openstack_ca: install ca certificate for openstack
@ -379,10 +348,9 @@ class CertificateController(rest.RestController):
continue
# validation checking for ssl, tpm_mode, docker_registry
# validation checking for ssl, docker_registry
# and openstack certficcates
if mode in [constants.CERT_MODE_SSL,
constants.CERT_MODE_TPM,
constants.CERT_MODE_DOCKER_REGISTRY,
constants.CERT_MODE_OPENSTACK,
]:
@ -418,27 +386,6 @@ class CertificateController(rest.RestController):
elif msg:
return dict(success="", error=msg)
if mode == constants.CERT_MODE_TPM:
try:
tpm = pecan.request.dbapi.tpmconfig_get_one()
except exception.NotFound:
tpm = None
pass
if tpm:
tpmdevices = pecan.request.dbapi.tpmdevice_get_list()
# if any of the tpm devices are in APPLYING state
# then disallow a modification until previous config
# either applies or fails
for device in tpmdevices:
if device.state == constants.TPMCONFIG_APPLYING:
msg = ("TPM Device %s is still in APPLYING state. "
"Wait for the configuration to finish "
"before attempting a modification." %
device.uuid)
LOG.info(msg)
return dict(success="", error=msg)
try:
config_dict = {'passphrase': passphrase,
'mode': mode,
@ -454,15 +401,6 @@ class CertificateController(rest.RestController):
return dict(success="", error=str(e.value), body="", certificates={})
certificates = pecan.request.dbapi.certificate_get_list()
# ssl and ssl_tpm certs are mutual exclusive, so
# if the new cert is a SSL cert, delete the existing TPM cert as well
# if the new cert is a TPM cert, delete the existing SSL cert as well
for certificate in certificates:
if (mode == constants.CERT_MODE_SSL
and certificate.certtype == constants.CERT_MODE_TPM) or \
(mode == constants.CERT_MODE_TPM
and certificate.certtype == constants.CERT_MODE_SSL):
pecan.request.dbapi.certificate_destroy(certificate.uuid)
# Create new or update existing certificates in sysinv with the
# information returned from conductor manager.
@ -472,7 +410,6 @@ class CertificateController(rest.RestController):
# cert is ICA signed cert (ie, the pem_contents contains
# intermediate CA certs), skip these intermediate CA certs.
if mode in [constants.CERT_MODE_SSL,
constants.CERT_MODE_TPM,
constants.CERT_MODE_DOCKER_REGISTRY,
constants.CERT_MODE_OPENSTACK] \
and inv_cert.get('is_ca', None):

View File

@ -5673,9 +5673,6 @@ class HostController(rest.RestController):
self._semantic_check_cinder_volumes(hostupdate.ihost_orig)
self._semantic_check_filesystem_sizes(hostupdate.ihost_orig)
self._semantic_check_storage_backend(hostupdate.ihost_orig)
# If HTTPS is enabled then we may be in TPM configuration mode
if utils.get_https_enabled():
self._semantic_check_tpm_config(hostupdate.ihost_orig)
if utils.get_system_mode() == constants.SYSTEM_MODE_DUPLEX:
self._semantic_check_duplex_oam_config(hostupdate.ihost_orig)
@ -5938,38 +5935,6 @@ class HostController(rest.RestController):
"Rebooting a simplex system is not allowed."))
return True
@staticmethod
def _semantic_check_tpm_config(ihost):
"""Pre swact/unlock semantic checks for TPM configuration"""
tpmconfig = utils.get_tpm_config()
if tpmconfig:
# retrieve the tpmdevice configuration for this host.
# If this host got Reinstalled or Restored, and it had
# TPM configured on it prior, then we should still find
# a valid tpmdevice entry for this host. Otherwise this
# is a new host or a previous host that was deleted and re-added
tpmdevice = \
pecan.request.dbapi.tpmdevice_get_by_host(ihost['uuid'])
if not tpmdevice or len(tpmdevice) > 1:
raise wsme.exc.ClientSideError(
_("Global TPM configuration found; but "
"no valid TPM Device configuration on host %s." %
ihost['hostname']))
tpmdevice = tpmdevice[0]
if tpmdevice.state == constants.TPMCONFIG_APPLYING:
raise wsme.exc.ClientSideError(
_("TPM configuration in progress on host %s; "
"Please wait for operation to complete "
"before re-attempting." % ihost['hostname']))
elif tpmdevice.state != constants.TPMCONFIG_APPLIED:
# if the TPM certificate for this host is not
# preserved as tpm_data, then disallow unlock/swact
if not tpmdevice.tpm_data:
raise wsme.exc.ClientSideError(
_("TPM configuration not fully applied on host %s; "
"Please run system certificate-install -m tpm_mode "
"before re-attempting." % ihost['hostname']))
def _semantic_check_swact_upgrade(self, from_host, to_host, force_swact=False):
"""
Perform semantic checks related to upgrades prior to swacting host.
@ -6195,10 +6160,6 @@ class HostController(rest.RestController):
ihost_ctr,
force_swact)
# If HTTPS is enabled then we may be in TPM mode
if utils.get_https_enabled():
self._semantic_check_tpm_config(ihost_ctr)
# Check: If DRBD is resizing
controller_fs_list = pecan.request.dbapi.controller_fs_get_list()
for controller_fs in controller_fs_list:

View File

@ -1454,13 +1454,11 @@ CERT_MODE_KUBERNETES_ROOT_CA = 'kubernetes-root-ca'
CERT_MODE_ETCD = 'etcd'
CERT_MODE_SSL = 'ssl'
CERT_MODE_SSL_CA = 'ssl_ca'
CERT_MODE_TPM = 'tpm_mode'
CERT_MODE_DOCKER_REGISTRY = 'docker_registry'
CERT_MODE_OPENSTACK = 'openstack'
CERT_MODE_OPENSTACK_CA = 'openstack_ca'
CERT_MODES_SUPPORTED = [CERT_MODE_SSL,
CERT_MODE_SSL_CA,
CERT_MODE_TPM,
CERT_MODE_DOCKER_REGISTRY,
CERT_MODE_OPENSTACK,
CERT_MODE_OPENSTACK_CA,

View File

@ -32,7 +32,6 @@ collection of inventory data for each host.
import errno
import filecmp
import fnmatch
import glob
import hashlib
import io
import math
@ -7345,8 +7344,7 @@ class ConductorManager(service.PeriodicService):
if system.capabilities.get('https_enabled', False):
certificates = self.dbapi.certificate_get_list()
for certificate in certificates:
if certificate.certtype in [constants.CERT_MODE_SSL,
constants.CERT_MODE_TPM]:
if certificate.certtype == constants.CERT_MODE_SSL:
break
else:
self._config_selfsigned_certificate(context)
@ -12268,38 +12266,12 @@ class ConductorManager(service.PeriodicService):
self._config_apply_runtime_manifest(context, config_uuid, config_dict)
def _destroy_tpm_config(self, context, tpm_obj=None):
"""Delete a tpmconfig."""
if not tpm_obj:
tpm_obj = None
try:
tpm_obj = self.dbapi.tpmconfig_get_one()
except exception.NotFound:
return
tpm_file = tpm_obj.tpm_path
tpmdevices = self.dbapi.tpmdevice_get_list()
for device in tpmdevices:
self.dbapi.tpmdevice_destroy(device.uuid)
self.dbapi.tpmconfig_destroy(tpm_obj.uuid)
self.update_tpm_config_manifests(context,
delete_tpm_file=tpm_file)
alarms = self.fm_api.get_faults_by_id(
fm_constants.FM_ALARM_ID_TPM_INIT)
if alarms:
for alarm in alarms:
self.fm_api.clear_fault(
fm_constants.FM_ALARM_ID_TPM_INIT,
alarm.entity_instance_id)
@staticmethod
def _extract_keys_from_pem(mode, pem_contents, cert_format,
passphrase=None):
"""Extract keys from the pem contents
:param mode: mode one of: ssl, tpm_mode, docker_registry
:param mode: mode one of: ssl, docker_registry
:param pem_contents: pem_contents in unicode
:param cert_format: serialization.PrivateFormat
:param passphrase: passphrase for PEM file
@ -12314,7 +12286,6 @@ class ConductorManager(service.PeriodicService):
private_mode = False
temp_pem_contents = pem_contents.encode("utf-8")
if mode in [constants.CERT_MODE_SSL,
constants.CERT_MODE_TPM,
constants.CERT_MODE_DOCKER_REGISTRY,
constants.CERT_MODE_OPENSTACK,
]:
@ -12424,42 +12395,6 @@ class ConductorManager(service.PeriodicService):
LOG.warn(msg)
raise exception.SysinvException(_(msg))
def _perform_config_certificate_tpm_mode(self, context,
tpm, private_bytes, public_bytes):
personalities = [constants.CONTROLLER]
os_tpmdevices = glob.glob('/dev/tpm*')
if not os_tpmdevices:
msg = "TPM device does not exist on active controller"
LOG.warn(msg)
raise exception.SysinvException(_(msg))
config_uuid = self._config_update_hosts(context, personalities)
cert_path = constants.SSL_CERT_DIR + 'key.pem'
public_path = constants.SSL_CERT_DIR + 'cert.pem'
config_dict = {
'personalities': personalities,
'file_names': [cert_path, public_path],
'file_content': {cert_path: private_bytes,
public_path: public_bytes},
'permissions': constants.CONFIG_FILE_PERMISSION_ROOT_READ_ONLY,
}
self._config_update_file(context, config_uuid, config_dict)
tpmconfig_dict = {'tpm_path': constants.SSL_CERT_DIR + 'object.tpm'}
if not tpm:
self.dbapi.tpmconfig_create(tpmconfig_dict)
tpmconfig_dict.update(
{'cert_path': constants.SSL_CERT_DIR + 'key.pem',
'public_path': constants.SSL_CERT_DIR + 'cert.pem'})
self.update_tpm_config(context,
tpmconfig_dict,
update_file_required=False)
def _get_registry_floating_address(self):
"""gets the registry floating address. Currently this is mgmt
"""
@ -12479,8 +12414,6 @@ class ConductorManager(service.PeriodicService):
In regular mode, the SSL certificate is crafted from the
isolated private and public keys.
In tpm_mode, this is done by tpmconfig
"""
passphrase = config_dict.get('passphrase', None)
@ -12494,27 +12427,8 @@ class ConductorManager(service.PeriodicService):
passphrase)
personalities = [constants.CONTROLLER]
tpm = None
try:
tpm = self.dbapi.tpmconfig_get_one()
except exception.NotFound:
pass
if mode == constants.CERT_MODE_TPM:
private_bytes = self._get_private_bytes_one(private_key)
public_bytes = self._get_public_bytes(cert_list)
self._perform_config_certificate_tpm_mode(
context, tpm, private_bytes, public_bytes)
file_content = public_bytes
# copy the certificate to shared directory
with os.fdopen(os.open(constants.SSL_PEM_FILE_SHARED,
os.O_CREAT | os.O_TRUNC | os.O_WRONLY,
constants.CONFIG_FILE_PERMISSION_ROOT_READ_ONLY),
'wb') as f:
f.write(file_content)
elif mode == constants.CERT_MODE_SSL:
if mode == constants.CERT_MODE_SSL:
config_uuid = self._config_update_hosts(context, personalities)
private_bytes = self._get_private_bytes_one(private_key)
public_bytes = self._get_public_bytes(cert_list)
@ -12535,11 +12449,6 @@ class ConductorManager(service.PeriodicService):
'wb') as f:
f.write(file_content)
if tpm:
LOG.info("tpm_mode not requested; destroy tpmconfig=%s" %
tpm.uuid)
self._destroy_tpm_config(context, tpm_obj=tpm)
config_uuid = self._config_update_hosts(context, personalities)
config_dict = {
"personalities": personalities,

View File

@ -4444,19 +4444,6 @@ class Connection(object):
:param uuid: The uuid of an SDN controller.
"""
@abc.abstractmethod
def tpmconfig_create(self, values):
"""Create a new TPM configuration.
:param values: A dict containing several items used to identify
and track the global TPM configuration. For example:
{
'uuid' : uuidutils.generate_uuid(),
'tpm_path' : Path to TPM object context,
}
:returns: A TPM configuration
"""
@abc.abstractmethod
def tpmconfig_get(self, uuid):
"""Return a TPM configuration

View File

@ -7517,25 +7517,6 @@ class Connection(api.Connection):
raise exception.SDNControllerNotFound(uuid)
query.delete()
@objects.objectify(objects.tpmconfig)
def tpmconfig_create(self, values):
if not values.get('uuid'):
values['uuid'] = uuidutils.generate_uuid()
tpmconfig = models.tpmconfig()
tpmconfig.update(values)
with _session_for_write() as session:
try:
session.add(tpmconfig)
session.flush()
except db_exc.DBDuplicateEntry:
LOG.error("Failed to add TPM configuration %s. "
"Already exists with this uuid" %
(values['uuid']))
raise exception.TPMConfigAlreadyExists(uuid=values['uuid'])
return tpmconfig
@objects.objectify(objects.tpmconfig)
def tpmconfig_get(self, uuid):
query = model_query(models.tpmconfig)