config/sysinv/sysinv/sysinv/sysinv/cert_mon/utils.py

684 lines
23 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Copyright (c) 2020-2021 Wind River Systems, Inc.
#
# The right to copy, distribute, modify, or otherwise make use
# of this software may be licensed only pursuant to the terms
# of an applicable Wind River license agreement.
#
import base64
import json
import os
import re
import requests
import ssl
import tempfile
from eventlet.green import subprocess
from six.moves.urllib.parse import urlparse
from keystoneclient.v3 import client as keystone_client
from keystoneauth1 import session
from keystoneclient.auth.identity import v3
from oslo_config import cfg
from oslo_log import log
from six.moves.urllib.request import Request
from six.moves.urllib.error import HTTPError
from six.moves.urllib.error import URLError
from six.moves.urllib.request import urlopen
from sysinv.common import constants
from sysinv.openstack.common.keystone_objects import Token
from sysinv.common import kubernetes as sys_kube
# Subcloud sync status
ENDPOINT_TYPE_DC_CERT = 'dc-cert'
SYNC_STATUS_UNKNOWN = "unknown"
SYNC_STATUS_IN_SYNC = "in-sync"
SYNC_STATUS_OUT_OF_SYNC = "out-of-sync"
DEPLOY_STATE_DONE = 'complete'
MANAGEMENT_UNMANAGED = "unmanaged"
MANAGEMENT_MANAGED = "managed"
AVAILABILITY_OFFLINE = "offline"
AVAILABILITY_ONLINE = "online"
CERT_NAMESPACE_SYS_CONTROLLER = 'dc-cert'
CERT_NAMESPACE_SUBCLOUD_CONTROLLER = 'sc-cert'
DC_ROLE_UNDETECTED = 'unknown'
LOG = log.getLogger(__name__)
CONF = cfg.CONF
dc_role = DC_ROLE_UNDETECTED
def update_admin_ep_cert(token, ca_crt, tls_crt, tls_key):
service_type = 'platform'
service_name = 'sysinv'
sysinv_url = token.get_service_internal_url(service_type, service_name)
api_cmd = sysinv_url + '/certificate/certificate_renew'
api_cmd_payload = dict()
api_cmd_payload['certtype'] = constants.CERTIFICATE_TYPE_ADMIN_ENDPOINT
resp = rest_api_request(token, "POST", api_cmd, json.dumps(api_cmd_payload))
if 'result' in resp and resp['result'] == 'OK':
LOG.info('Update admin endpoint certificate request succeeded')
else:
LOG.error('Request response %s' % resp)
raise Exception('Update admin endpoint certificate failed')
def verify_adminep_cert_chain():
"""
Verify admin endpoint certificate chain & delete if invalid
:param context: an admin context.
:return: True/False if chain is valid
"""
"""
* Retrieve ICA & AdminEP cert secrets from k8s
* base64 decode ICA cert (tls.crt from SC_INTERMEDIATE_CA_SECRET_NAME)
* & adminep (tls.crt from SC_ADMIN_ENDPOINT_SECRET_NAME)
* & store the crts in tempfiles
* Run openssl verify against RootCA to verify the chain
"""
kube_op = sys_kube.KubeOperator()
secret_ica = kube_op.kube_get_secret(constants.SC_INTERMEDIATE_CA_SECRET_NAME,
CERT_NAMESPACE_SUBCLOUD_CONTROLLER)
if 'tls.crt' not in secret_ica.data:
raise Exception('%s tls.crt (ICA) data missing'
% (constants.SC_INTERMEDIATE_CA_SECRET_NAME))
secret_adminep = kube_op.kube_get_secret(constants.SC_ADMIN_ENDPOINT_SECRET_NAME,
CERT_NAMESPACE_SUBCLOUD_CONTROLLER)
if 'tls.crt' not in secret_adminep.data:
raise Exception('%s tls.crt data missing'
% (constants.SC_ADMIN_ENDPOINT_SECRET_NAME))
txt_ca_crt = base64.b64decode(secret_ica.data['tls.crt'])
txt_tls_crt = base64.b64decode(secret_adminep.data['tls.crt'])
with tempfile.NamedTemporaryFile() as ca_tmpfile:
ca_tmpfile.write(txt_ca_crt)
ca_tmpfile.flush()
with tempfile.NamedTemporaryFile() as adminep_tmpfile:
adminep_tmpfile.write(txt_tls_crt)
adminep_tmpfile.flush()
cmd = ['openssl', 'verify', '-CAfile', constants.DC_ROOT_CA_CERT_PATH,
'-untrusted', ca_tmpfile.name, adminep_tmpfile.name]
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = proc.communicate()
proc.wait()
if 0 == proc.returncode:
LOG.info('verify_adminep_cert_chain passed. Valid chain')
return True
else:
LOG.info('verify_adminep_cert_chain: Chain is invalid\n%s\n%s'
% (stdout, stderr))
res = kube_op.kube_delete_secret(constants.SC_ADMIN_ENDPOINT_SECRET_NAME,
CERT_NAMESPACE_SUBCLOUD_CONTROLLER)
LOG.info('Deleting AdminEP secret due to invalid chain. %s:%s, result %s, msg %s'
% (CERT_NAMESPACE_SUBCLOUD_CONTROLLER,
constants.SC_ADMIN_ENDPOINT_SECRET_NAME,
res.status, res.message))
return False
def dc_get_subcloud_sysinv_url(subcloud_name):
user_auth = v3.Password(
auth_url=CONF.endpoint_cache.auth_uri,
username=CONF.endpoint_cache.username,
user_domain_name=CONF.endpoint_cache.user_domain_name,
password=CONF.endpoint_cache.password,
project_name=CONF.endpoint_cache.project_name,
project_domain_name=CONF.endpoint_cache.project_domain_name,
)
timeout = CONF.endpoint_cache.http_connect_timeout
admin_session = session.Session(auth=user_auth, timeout=timeout)
ks_client = keystone_client.Client(
session=admin_session,
region_name=constants.REGION_ONE_NAME,
interface=constants.OS_INTERFACE_INTERNAL)
services = ks_client.services.list(name='sysinv')
if len(services) == 0:
raise Exception('Cannot find sysinv service')
s_id = services[0].id
sc_sysinv_urls = ks_client.endpoints.list(
service=s_id,
interface=constants.OS_INTERFACE_ADMIN,
region=subcloud_name)
if len(sc_sysinv_urls) == 0:
raise Exception('Cannot find sysinv endpoint for %s' %
subcloud_name)
sc_sysinv_url = sc_sysinv_urls[0].url
LOG.info('%s sysinv endpoint %s' % (subcloud_name, sc_sysinv_url))
if not sc_sysinv_url:
raise Exception('{} sysinv endpoint is None'.format(subcloud_name))
return sc_sysinv_url
def dc_get_service_endpoint_url(region, service_name, endpoint_type):
user_auth = v3.Password(
auth_url=CONF.endpoint_cache.auth_uri,
username=CONF.endpoint_cache.username,
user_domain_name=CONF.endpoint_cache.user_domain_name,
password=CONF.endpoint_cache.password,
project_name=CONF.endpoint_cache.project_name,
project_domain_name=CONF.endpoint_cache.project_domain_name,
)
timeout = CONF.endpoint_cache.http_connect_timeout
admin_session = session.Session(auth=user_auth, timeout=timeout)
ks_client = keystone_client.Client(
session=admin_session,
region_name=constants.REGION_ONE_NAME,
interface=constants.OS_INTERFACE_INTERNAL)
services = ks_client.services.list(name=service_name)
if len(services) == 0:
raise Exception('Cannot find %s service' % service_name)
s_id = services[0].id
endpoint_urls = ks_client.endpoints.list(
service=s_id,
interface=endpoint_type,
region=region)
if len(endpoint_urls) == 0:
raise Exception('Cannot find %s endpoint for %s' %
(service_name, region))
endpoint_url = endpoint_urls[0].url
LOG.info('%s %s endpoint %s' % (region, service_name, endpoint_url))
return endpoint_url
def update_subcloud_ca_cert(
token, sc_name, sysinv_url, ca_crt, tls_crt, tls_key):
api_cmd = sysinv_url + '/certificate/certificate_renew'
api_cmd_payload = dict()
api_cmd_payload['certtype'] = \
constants.CERTIFICATE_TYPE_ADMIN_ENDPOINT_INTERMEDIATE_CA
api_cmd_payload['root_ca_crt'] = ca_crt
api_cmd_payload['sc_ca_cert'] = tls_crt
api_cmd_payload['sc_ca_key'] = tls_key
timeout = int(CONF.endpoint_cache.http_connect_timeout)
resp = rest_api_request(token, "POST", api_cmd, json.dumps(api_cmd_payload),
timeout=timeout)
if 'result' in resp and resp['result'] == 'OK':
LOG.info('Update %s intermediate CA cert request succeed' % sc_name)
else:
LOG.error('Request response %s' % resp)
raise Exception('Update %s intermediate CA cert failed' % sc_name)
def get_subcloud(token, subcloud_name):
service_name = 'dcmanager'
api_url = dc_get_service_endpoint_url(constants.SYSTEM_CONTROLLER_REGION,
service_name,
constants.OS_INTERFACE_INTERNAL)
api_cmd = api_url + '/subclouds/%s' % subcloud_name
LOG.info('api_cmd %s' % api_cmd)
resp = rest_api_request(token, "GET", api_cmd)
return resp
def load_subclouds(resp):
data = resp
print(data)
list = []
for obj in data['subclouds']:
sc = {}
sc['name'] = obj['name']
sc['management-state'] = obj['management-state']
sc['availability-status'] = obj['availability-status']
sc['sync_status'] = obj['sync_status']
for ss in obj['endpoint_sync_status']:
sc[ss['endpoint_type']] = ss['sync_status']
list.append(sc)
return list
def get_subclouds_from_dcmanager(token):
service_name = 'dcmanager'
api_url = dc_get_service_endpoint_url(constants.SYSTEM_CONTROLLER_REGION,
service_name,
constants.OS_INTERFACE_INTERNAL)
api_cmd = api_url + '/subclouds'
LOG.debug('api_cmd %s' % api_cmd)
resp = rest_api_request(token, "GET", api_cmd)
return load_subclouds(resp)
def update_subcloud_status(token, subcloud_name, status):
service_name = 'dcmanager'
api_url = dc_get_service_endpoint_url(constants.SYSTEM_CONTROLLER_REGION,
service_name,
constants.OS_INTERFACE_INTERNAL)
api_cmd = api_url + '/subclouds/%s/update_status' % subcloud_name
api_cmd_payload = dict()
api_cmd_payload['endpoint'] = ENDPOINT_TYPE_DC_CERT
api_cmd_payload['status'] = status
resp = rest_api_request(token, "PATCH", api_cmd, json.dumps(api_cmd_payload))
if 'result' in resp and resp['result'] == 'OK':
LOG.info('Update admin endpoint request succeed')
else:
LOG.error('Request response %s' % resp)
raise Exception('Update %s admin endpoint failed')
def rest_api_request(token, method, api_cmd,
api_cmd_payload=None, timeout=10):
"""
Make a rest-api request
Returns: response as a dictionary
"""
api_cmd_headers = dict()
api_cmd_headers['Content-type'] = "application/json"
api_cmd_headers['User-Agent'] = "cert-mon/1.0"
try:
request_info = Request(api_cmd)
request_info.get_method = lambda: method
if token:
request_info.add_header("X-Auth-Token", token.get_id())
request_info.add_header("Accept", "application/json")
if api_cmd_headers is not None:
for header_type, header_value in api_cmd_headers.items():
request_info.add_header(header_type, header_value)
if api_cmd_payload is not None:
request_info.add_data(api_cmd_payload)
request = None
try:
request = urlopen(request_info, timeout=timeout)
response = request.read()
finally:
if request:
request.close()
if response == "":
response = json.loads("{}")
else:
response = json.loads(response)
except HTTPError as e:
if 401 == e.code:
if token:
token.set_expired()
raise
except URLError:
LOG.error("Cannot access %s" % api_cmd)
raise
return response
def get_system(token, method, api_cmd, api_cmd_headers=None,
api_cmd_payload=None, timeout=10):
"""
Make a rest-api request
Returns: response as a dictionary
"""
LOG.debug("%s cmd:%s hdr:%s payload:%s" % (method,
api_cmd, api_cmd_headers, api_cmd_payload))
response = None
try:
request_info = Request(api_cmd)
request_info.get_method = lambda: method
if token:
request_info.add_header("X-Auth-Token", token.get_id())
request_info.add_header("Accept", "application/json")
if api_cmd_headers is not None:
for header_type, header_value in api_cmd_headers.items():
request_info.add_header(header_type, header_value)
if api_cmd_payload is not None:
request_info.add_data(api_cmd_payload)
request = urlopen(request_info, timeout=timeout)
response = request.read()
if response == "":
response = json.loads("{}")
else:
response = json.loads(response)
request.close()
except HTTPError as e:
if 401 == e.code:
if token:
token.set_expired()
LOG.warn("HTTP Error e.code=%s e=%s" % (e.code, e))
if hasattr(e, 'msg') and e.msg:
response = json.loads(e.msg)
else:
response = json.loads("{}")
raise
except URLError:
LOG.error("Cannot access %s" % api_cmd)
raise
finally:
return response
def get_token():
token = _get_token(
CONF.keystone_authtoken.auth_url + '/v3/auth/tokens',
CONF.keystone_authtoken.project_name,
CONF.keystone_authtoken.username,
CONF.keystone_authtoken.password,
CONF.keystone_authtoken.user_domain_name,
CONF.keystone_authtoken.project_domain_name,
CONF.keystone_authtoken.region_name)
return token
def get_dc_token(region_name):
token = _get_token(
CONF.endpoint_cache.auth_uri + '/auth/tokens',
CONF.endpoint_cache.project_name,
CONF.endpoint_cache.username,
CONF.endpoint_cache.password,
CONF.endpoint_cache.user_domain_name,
CONF.endpoint_cache.project_domain_name,
region_name)
return token
def _get_token(auth_url, auth_project, username, password, user_domain,
project_domain, region_name, timeout=60):
"""
Ask OpenStack Keystone for a token
Returns: token object or None on failure
"""
try:
request_info = Request(auth_url)
request_info.add_header("Content-type", "application/json")
request_info.add_header("Accept", "application/json")
payload = json.dumps(
{"auth": {
"identity": {
"methods": [
"password"
],
"password": {
"user": {
"name": username,
"password": password,
"domain": {"name": user_domain}
}
}
},
"scope": {
"project": {
"name": auth_project,
"domain": {"name": project_domain}
}}}})
request_info.add_data(payload)
request = urlopen(request_info, timeout=timeout)
# Identity API v3 returns token id in X-Subject-Token
# response header.
token_id = request.info().getheader('X-Subject-Token')
response = json.loads(request.read())
request.close()
# save the region name for service url lookup
return Token(response, token_id, region_name)
except HTTPError as e:
LOG.error("%s, %s" % (e.code, e.read()))
return None
except URLError as e:
LOG.error(e)
return None
def init_keystone_auth_opts():
keystone_opts = [
cfg.StrOpt('username',
help='Username of account'),
cfg.StrOpt('auth_uri',
help='authentication uri'),
cfg.StrOpt('password',
help='Password of account'),
cfg.StrOpt('project_name',
help='Tenant name of account'),
cfg.StrOpt('user_domain_name',
default='Default',
help='User domain name of account'),
cfg.StrOpt('project_domain_name',
default='Default',
help='Project domain name of account'),
cfg.StrOpt('region_name',
default='',
help='Region name'),
cfg.StrOpt('auth_url',
default='',
help='Authorization url')
]
keystone_opt_group = cfg.OptGroup(name='keystone_authtoken',
title='Keystone options')
cfg.CONF.register_opts(keystone_opts, group=keystone_opt_group.name)
endpoint_opts = keystone_opts[:]
endpoint_opts.append(
cfg.StrOpt('http_connect_timeout',
default=10,
help='HTTP connection timeout')
)
endpoint_cache_group = cfg.OptGroup(name='endpoint_cache',
title='Endpoint cache')
cfg.CONF.register_opts(endpoint_opts, group=endpoint_cache_group.name)
def get_subcloud_secrets():
"""get subcloud name and ICA secret name pairs from k8s secret
Every subcloud comes with an ICA entry in k8s secret
:return: dict of subcloud name and ICA secret name pairs
"""
secret_pattern = re.compile('-adminep-ca-certificate$')
kube_op = sys_kube.KubeOperator()
secret_list = kube_op.kube_list_secret(ENDPOINT_TYPE_DC_CERT)
dict = {}
for secret in secret_list:
secret_name = secret.metadata.name
m = secret_pattern.search(secret_name)
if m:
start = m.start()
if start > 0:
dict.update({secret_name[0:m.start()]: secret_name})
return dict
def get_subclouds():
"""get name of all subclouds from k8s secret
Every subcloud comes with an ICA entry in k8s secret
:return: list of subcloud names
"""
subcloud_secrets = get_subcloud_secrets()
return subcloud_secrets.keys()
def get_intermediate_ca_secret_name(sc):
return '{}-adminep-ca-certificate'.format(sc)
def get_sc_intermediate_ca_secret(sc):
secret_name = get_intermediate_ca_secret_name(sc)
kube_op = sys_kube.KubeOperator()
return kube_op.kube_get_secret(secret_name, CERT_NAMESPACE_SYS_CONTROLLER)
def get_endpoint_certificate(endpoint):
url = urlparse(endpoint)
host = url.hostname
port = url.port
return ssl.get_server_certificate((host, port))
def get_dc_role():
global dc_role
if dc_role == DC_ROLE_UNDETECTED:
token = get_token()
service_type = 'platform'
service_name = 'sysinv'
sysinv_url = token.get_service_internal_url(service_type,
service_name)
api_cmd = sysinv_url + '/isystems'
res = rest_api_request(token, "GET", api_cmd)['isystems']
if len(res) == 1:
system = res[0]
dc_role = system['distributed_cloud_role']
LOG.info('Result %s' % system)
else:
raise Exception('Failed to access system data')
return dc_role
def get_isystems_uuid(token):
uuid = ''
sysinv_url = token.get_service_internal_url(constants.SERVICE_TYPE_PLATFORM, constants.SYSINV_USERNAME)
api_cmd = sysinv_url + '/isystems'
res = rest_api_request(token, "GET", api_cmd)['isystems']
if len(res) == 1:
system = res[0]
uuid = system['uuid']
else:
raise Exception('Failed to access system data')
return uuid
def enable_https(token, system_uuid):
ret = True
sysinv_url = token.get_service_internal_url(constants.SERVICE_TYPE_PLATFORM, constants.SYSINV_USERNAME)
api_cmd = sysinv_url + '/isystems/' + system_uuid
patch = []
patch.append({'op': 'replace', 'path': '/https_enabled', 'value': 'true'})
resp = rest_api_request(token, "PATCH", api_cmd, json.dumps(patch))
if resp['capabilities']['https_enabled'] is True:
LOG.info('Enable https patch request succeeded')
else:
ret = False
LOG.exception('Enable https failed! resp=%s' % resp)
return ret
def upload_request_with_data(token, url, **kwargs):
headers = {"X-Auth-Token": token.get_id()}
files = {'file': ("for_upload",
kwargs['body'],)}
data = kwargs.get('data')
req = requests.post(url, headers=headers, files=files,
data=data)
LOG.info('response from upload API = %s' % req.json())
return req.json()
def rest_api_upload(token, filepath, url, data=None):
"""
Make a rest-api upload call
"""
LOG.info('rest_api_upload called. filepath=%s, url=%s, data=%s' % (filepath, url, data))
try:
file_to_upload = open(filepath, 'rb')
except Exception as e:
LOG.exception(e)
return upload_request_with_data(token, url, body=file_to_upload, data=data)
def update_pemfile(tls_crt, tls_key):
LOG.info('Updating temporary pemfile')
try:
fd, tmppath = tempfile.mkstemp(suffix='.pem')
with open(tmppath, 'w+') as f:
f.write(tls_crt)
f.write("\n")
f.write(tls_key)
except Exception as e:
LOG.exception(e)
raise
finally:
if fd is not None:
os.close(fd)
return tmppath
def update_platform_cert(token, cert_type, pem_file_path, force=False):
"""Update a platform certificate using the sysinv API
:param token: the token to access the sysinv API
:param cert_type: the type of the certificate that is being updated
:param pem_file_path: path to the certificate file in PEM format
:param force: whether to bypass semantic checks and force the update,
defaults to False
"""
LOG.info('Updating %s certificate. pem_file_path=%s' % (cert_type, pem_file_path))
sysinv_url = token.get_service_internal_url(constants.SERVICE_TYPE_PLATFORM, constants.SYSINV_USERNAME)
api_cmd = sysinv_url + '/certificate/certificate_install'
data = {'mode': cert_type,
'force': str(force).lower()}
response = rest_api_upload(token, pem_file_path, api_cmd, data)
error = response.get('error')
if error:
LOG.info('Failed. Certificate not installed. Error=%s' % error)
else:
LOG.info('Certificate successfully installed')
# cleanup
try:
os.remove(pem_file_path)
except OSError:
LOG.exception('Failed to remove temp pem file %s' % pem_file_path)