Debian - Fix CA cert file location

Correct CA cert file location for Debian.

Test Plan:
Verify: Bootstrap and adding a Subcloud on Debian
Verify: Bootstrap and adding a Subcloud on Centos

Story: 2010119
Task: 45761

Signed-off-by: Li Zhu <li.zhu@windriver.com>
Change-Id: Ifcd91a384c8b8b13633d6fe71400cb0174a1eedc
This commit is contained in:
Li Zhu 2022-07-18 16:07:52 -04:00
parent 29fe24acb3
commit 2a95d00dee
3 changed files with 130 additions and 6 deletions

View File

@ -123,3 +123,14 @@ AVAILABILITY_ONLINE = "online"
SYNC_STATUS_UNKNOWN = "unknown"
SYNC_STATUS_IN_SYNC = "in-sync"
SYNC_STATUS_OUT_OF_SYNC = "out-of-sync"
# OS type
OS_RELEASE_FILE = '/etc/os-release'
OS_CENTOS = 'centos'
OS_DEBIAN = 'debian'
SUPPORTED_OS_TYPES = [OS_CENTOS, OS_DEBIAN]
# SSL cert
CERT_CA_FILE_CENTOS = "ca-cert.pem"
CERT_CA_FILE_DEBIAN = "ca-cert.crt"
SSL_CERT_CA_DIR = "/etc/pki/ca-trust/source/anchors/"

View File

@ -1,5 +1,5 @@
# Copyright 2016 Ericsson AB
# Copyright (c) 2017-2021 Wind River Systems, Inc.
# Copyright (c) 2017-2022 Wind River Systems, Inc.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -24,12 +24,12 @@ from oslo_utils import encodeutils
from dccommon import consts
from dccommon.drivers import base
from dccommon import exceptions
from dccommon import utils
LOG = log.getLogger(__name__)
API_VERSION = '1'
CERT_CA_FILE = "ca-cert.pem"
CERT_MODE_DOCKER_REGISTRY = 'docker_registry'
CERT_MODE_SSL = 'ssl'
CERT_MODE_SSL_CA = 'ssl_ca'
@ -38,8 +38,6 @@ CONTROLLER = 'controller'
NETWORK_TYPE_MGMT = 'mgmt'
SSL_CERT_CA_DIR = "/etc/pki/ca-trust/source/anchors/"
SSL_CERT_CA_FILE = os.path.join(SSL_CERT_CA_DIR, CERT_CA_FILE)
SSL_CERT_DIR = "/etc/ssl/private/"
SSL_CERT_FILE = "server-cert.pem"
SSL_PEM_FILE = os.path.join(SSL_CERT_DIR, SSL_CERT_FILE)
@ -491,11 +489,12 @@ class SysinvClient(base.DriverBase):
LOG.info("update_certificate signature {} data {}".format(
signature, data))
if not certificate:
ssl_cert_ca_file = utils.get_ssl_cert_ca_file()
if data:
data['passphrase'] = None
mode = data.get('mode', CERT_MODE_SSL)
if mode == CERT_MODE_SSL_CA:
certificate_files = [SSL_CERT_CA_FILE]
certificate_files = [ssl_cert_ca_file]
elif mode == CERT_MODE_SSL:
certificate_files = [SSL_PEM_FILE]
elif mode == CERT_MODE_DOCKER_REGISTRY:
@ -508,7 +507,7 @@ class SysinvClient(base.DriverBase):
return
elif signature and signature.startswith(CERT_MODE_SSL_CA):
data['mode'] = CERT_MODE_SSL_CA
certificate_files = [SSL_CERT_CA_FILE]
certificate_files = [ssl_cert_ca_file]
elif signature and signature.startswith(CERT_MODE_SSL):
data['mode'] = CERT_MODE_SSL
certificate_files = [SSL_PEM_FILE]

View File

@ -13,17 +13,23 @@
# limitations under the License.
#
import collections
from datetime import datetime
import functools
import os
import random
import re
from eventlet.green import subprocess
from oslo_log import log as logging
from oslo_utils import timeutils
from dccommon import consts
from dccommon import exceptions
from dccommon.exceptions import PlaybookExecutionFailed
from dccommon.exceptions import PlaybookExecutionTimeout
from dccommon.subprocess_cleanup import SubprocessCleanup
from dcorch.common.i18n import _
LOG = logging.getLogger(__name__)
ANSIBLE_PASSWD_PARMS = ['ansible_ssh_pass', 'ansible_become_pass']
@ -41,6 +47,43 @@ STALE_TOKEN_DURATION_STEP = 20
TIMEOUT_EXITCODE = 124
class memoized(object):
"""Decorator.
Caches a function's return value each time it is called.
If called later with the same arguments, the cached value is returned
(not reevaluated).
WARNING: This function should not be used for class methods since it
does not provide weak references; thus would prevent the instance from
being garbage collected.
"""
def __init__(self, func):
self.func = func
self.cache = {}
def __call__(self, *args):
if not isinstance(args, collections.Hashable):
# uncacheable. a list, for instance.
# better to not cache than blow up.
return self.func(*args)
if args in self.cache:
return self.cache[args]
else:
value = self.func(*args)
self.cache[args] = value
return value
def __repr__(self):
'''Return the function's docstring.'''
return self.func.__doc__
def __get__(self, obj, objtype):
'''Support instance methods.'''
return functools.partial(self.__call__, obj)
def _strip_password_from_command(script_command):
"""Strip out any known password arguments from given command"""
logged_command = list()
@ -151,3 +194,74 @@ def is_token_expiring_soon(token,
if timeutils.is_soon(expiry_time, duration):
return True
return False
def _get_key_from_file(file_contents, key):
"""Extract value from KEY=VALUE entries.
Ignore newline, ignore apostrophe, ignore quotation mark.
:param file_contents: contents of file
:param key: key to search
:return: found value or ''
"""
r = re.compile('^{}\=[\'\"]*([^\'\"\n]*)'.format(key), re.MULTILINE)
match = r.search(file_contents)
if match:
return match.group(1)
else:
return ''
@memoized
def get_os_release(release_file=consts.OS_RELEASE_FILE):
"""Function to read release information.
Ignore newline, ignore apostrophe, ignore quotation mark.
:param release_file: file to read from
:return: a tuple of (ID, VERSION)
"""
linux_distro = ('', '')
try:
with open(release_file, 'r') as f:
data = f.read()
linux_distro = (
_get_key_from_file(data, 'ID'),
_get_key_from_file(data, 'VERSION'))
except Exception as e:
raise exceptions.DCCommonException(
msg=_("Failed to open %s : %s" % (release_file, str(e))))
if linux_distro[0] == '':
raise exceptions.DCCommonException(
msg=_("Could not determine os type from %s" % release_file))
# Hint: This code is added here to aid future unit test.
# Probably running unit tests on a non-supported OS (example at
# time of writing: ubuntu), which is perfect, because code reaching
# here will fail, and we just identified a place that would split
# logic between OSs. The failing tests should mock this function
# (get_os_release) for each supported OS.
if linux_distro[0] not in consts.SUPPORTED_OS_TYPES:
raise exceptions.DCCommonException(
msg=_("Unsupported OS detected %s" % linux_distro[0]))
return linux_distro
def get_os_type(release_file=consts.OS_RELEASE_FILE):
return get_os_release(release_file)[0]
def is_debian():
return get_os_type() == consts.OS_DEBIAN
def is_centos():
return get_os_type() == consts.OS_CENTOS
def get_ssl_cert_ca_file():
return os.path.join(
consts.SSL_CERT_CA_DIR,
consts.CERT_CA_FILE_DEBIAN if is_debian() else consts.CERT_CA_FILE_CENTOS)