3fe42b53fd
Primarily remove the workaround added in Ia6d512ff2ae417bab938cb095fbb0884d195010a which added continued use of autocommit, which is incompatible with SQLAlchemy 2.0. Also set the environment for unit tests to report compatability warnings, although it appears none are being reported at this time. Also cuts out the db upgrade cruft to only use the online database migration code through oslo_db's enginefacade, which has the smarts to handle online or offline migrations. And then, retools unit/functional test data storage to utlize sqlite, and in that re-tooled the queries to prevent locking conditions which could exist with queries, and some additional refactoring/cleanup. Also, don't mock and test time.sleep(). Additionally, it looks like we have discovered the root cause of the memory/connection leakage issue which has been observed, due to the way lists of nodes are processed/returned. This change was based upon the work in I506da42a9891a245831f325e34bec92e0a3f33f0 which is included in this commit as the entire database structure and interaction has been modified for ironic-inspector. Co-Authored-By: aarefiev <aarefiev@mirantis.com> Story: 2009727 Task: 44132 Change-Id: Ic88eb9dec5fddc924a72d9a23c17a304954ebf46
388 lines
15 KiB
Python
388 lines
15 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Handling introspection data from the ramdisk."""
|
|
|
|
import copy
|
|
import datetime
|
|
import os
|
|
|
|
from oslo_config import cfg
|
|
from oslo_serialization import base64
|
|
from oslo_utils import excutils
|
|
from oslo_utils import timeutils
|
|
|
|
from ironic_inspector.common.i18n import _
|
|
from ironic_inspector.common import ironic as ir_utils
|
|
from ironic_inspector import introspection_state as istate
|
|
from ironic_inspector import node_cache
|
|
from ironic_inspector.plugins import base as plugins_base
|
|
from ironic_inspector.pxe_filter import base as pxe_filter
|
|
from ironic_inspector import rules
|
|
from ironic_inspector import utils
|
|
|
|
CONF = cfg.CONF
|
|
|
|
LOG = utils.getProcessingLogger(__name__)
|
|
|
|
_STORAGE_EXCLUDED_KEYS = {'logs'}
|
|
|
|
|
|
def _store_logs(introspection_data, node_info):
|
|
logs = introspection_data.get('logs')
|
|
if not logs:
|
|
LOG.warning('No logs were passed by the ramdisk',
|
|
data=introspection_data, node_info=node_info)
|
|
return
|
|
|
|
if not CONF.processing.ramdisk_logs_dir:
|
|
LOG.warning('Failed to store logs received from the ramdisk '
|
|
'because ramdisk_logs_dir configuration option '
|
|
'is not set',
|
|
data=introspection_data, node_info=node_info)
|
|
return
|
|
|
|
fmt_args = {
|
|
'uuid': node_info.uuid if node_info is not None else 'unknown',
|
|
'mac': (utils.get_pxe_mac(introspection_data) or
|
|
'unknown').replace(':', ''),
|
|
'dt': datetime.datetime.utcnow(),
|
|
'bmc': (utils.get_ipmi_address_from_data(introspection_data) or
|
|
'unknown')
|
|
}
|
|
|
|
file_name = CONF.processing.ramdisk_logs_filename_format.format(**fmt_args)
|
|
|
|
try:
|
|
if not os.path.exists(CONF.processing.ramdisk_logs_dir):
|
|
os.makedirs(CONF.processing.ramdisk_logs_dir)
|
|
with open(os.path.join(CONF.processing.ramdisk_logs_dir, file_name),
|
|
'wb') as fp:
|
|
fp.write(base64.decode_as_bytes(logs))
|
|
except EnvironmentError:
|
|
LOG.exception('Could not store the ramdisk logs',
|
|
data=introspection_data, node_info=node_info)
|
|
else:
|
|
LOG.info('Ramdisk logs were stored in file %s', file_name,
|
|
data=introspection_data, node_info=node_info)
|
|
|
|
|
|
def _find_node_info(introspection_data, failures):
|
|
try:
|
|
address = utils.get_ipmi_address_from_data(introspection_data)
|
|
v6address = utils.get_ipmi_v6address_from_data(introspection_data)
|
|
bmc_addresses = list(filter(None, [address, v6address]))
|
|
macs = utils.get_valid_macs(introspection_data)
|
|
return node_cache.find_node(bmc_address=bmc_addresses,
|
|
mac=macs)
|
|
except utils.NotFoundInCacheError as exc:
|
|
if CONF.processing.permit_active_introspection:
|
|
try:
|
|
return node_cache.record_node(bmc_addresses=bmc_addresses,
|
|
macs=macs)
|
|
except utils.NotFoundInCacheError:
|
|
LOG.debug(
|
|
'Active nodes introspection is enabled, but no node '
|
|
'was found for MAC(s) %(mac)s and BMC address(es) '
|
|
'%(addr)s; proceeding with discovery',
|
|
{'mac': ', '.join(macs) if macs else None,
|
|
'addr': ', '.join(filter(None, bmc_addresses)) or None})
|
|
|
|
not_found_hook = plugins_base.node_not_found_hook_manager()
|
|
if not_found_hook is None:
|
|
failures.append(_('Look up error: %s') % exc)
|
|
return
|
|
|
|
LOG.debug('Running node_not_found_hook %s',
|
|
CONF.processing.node_not_found_hook,
|
|
data=introspection_data)
|
|
|
|
# NOTE(sambetts): If not_found_hook is not none it means that we were
|
|
# unable to find the node in the node cache and there is a node not
|
|
# found hook defined so we should try to send the introspection data
|
|
# to that hook to generate the node info before bubbling up the error.
|
|
try:
|
|
node_info = not_found_hook.driver(introspection_data)
|
|
if node_info:
|
|
return node_info
|
|
failures.append(_("Node not found hook returned nothing"))
|
|
except Exception as exc:
|
|
failures.append(_("Node not found hook failed: %s") % exc)
|
|
except utils.Error as exc:
|
|
failures.append(_('Look up error: %s') % exc)
|
|
|
|
|
|
def _run_pre_hooks(introspection_data, failures):
|
|
hooks = plugins_base.processing_hooks_manager()
|
|
for hook_ext in hooks:
|
|
LOG.debug('Running pre-processing hook %s', hook_ext.name,
|
|
data=introspection_data)
|
|
# NOTE(dtantsur): catch exceptions, so that we have changes to update
|
|
# node introspection status after look up
|
|
try:
|
|
hook_ext.obj.before_processing(introspection_data)
|
|
except utils.Error as exc:
|
|
LOG.error('Hook %(hook)s failed, delaying error report '
|
|
'until node look up: %(error)s',
|
|
{'hook': hook_ext.name, 'error': exc},
|
|
data=introspection_data)
|
|
failures.append('Preprocessing hook %(hook)s: %(error)s' %
|
|
{'hook': hook_ext.name, 'error': exc})
|
|
except Exception as exc:
|
|
LOG.exception('Hook %(hook)s failed, delaying error report '
|
|
'until node look up: %(error)s',
|
|
{'hook': hook_ext.name, 'error': exc},
|
|
data=introspection_data)
|
|
failures.append(_('Unexpected exception %(exc_class)s during '
|
|
'preprocessing in hook %(hook)s: %(error)s') %
|
|
{'hook': hook_ext.name,
|
|
'exc_class': exc.__class__.__name__,
|
|
'error': exc})
|
|
|
|
|
|
def _filter_data_excluded_keys(data):
|
|
return {k: v for k, v in data.items()
|
|
if k not in _STORAGE_EXCLUDED_KEYS}
|
|
|
|
|
|
def store_introspection_data(node_uuid, data, processed=True):
|
|
"""Store introspection data to the storage backend.
|
|
|
|
:param node_uuid: node UUID
|
|
:param data: Introspection data to be saved
|
|
:param processed: The type of introspection data, set to True means the
|
|
introspection data is processed, otherwise unprocessed.
|
|
:raises: utils.Error
|
|
"""
|
|
introspection_data_manager = plugins_base.introspection_data_manager()
|
|
store = CONF.processing.store_data
|
|
ext = introspection_data_manager[store].obj
|
|
ext.save(node_uuid, data, processed)
|
|
|
|
|
|
def _store_unprocessed_data(node_uuid, data):
|
|
try:
|
|
store_introspection_data(node_uuid, data, processed=False)
|
|
except Exception:
|
|
LOG.exception('Encountered exception saving unprocessed '
|
|
'introspection data for node %s', node_uuid, data=data)
|
|
|
|
|
|
def get_introspection_data(uuid, processed=True, get_json=False):
|
|
"""Get introspection data from the storage backend.
|
|
|
|
:param uuid: node UUID
|
|
:param processed: Indicates the type of introspection data to be read,
|
|
set True to request processed introspection data.
|
|
:param get_json: Specify whether return the introspection data in json
|
|
format, string value is returned if False.
|
|
:raises: utils.Error
|
|
"""
|
|
introspection_data_manager = plugins_base.introspection_data_manager()
|
|
store = CONF.processing.store_data
|
|
ext = introspection_data_manager[store].obj
|
|
return ext.get(uuid, processed=processed, get_json=get_json)
|
|
|
|
|
|
def process(introspection_data):
|
|
"""Process data from the ramdisk.
|
|
|
|
This function heavily relies on the hooks to do the actual data processing.
|
|
"""
|
|
unprocessed_data = copy.deepcopy(introspection_data)
|
|
failures = []
|
|
_run_pre_hooks(introspection_data, failures)
|
|
node_info = _find_node_info(introspection_data, failures)
|
|
if node_info:
|
|
# Locking is already done in find_node() but may be not done in a
|
|
# node_not_found hook
|
|
node_info.acquire_lock()
|
|
if failures or node_info is None:
|
|
msg = _('The following failures happened during running '
|
|
'pre-processing hooks:\n%s') % '\n'.join(failures)
|
|
if node_info is not None:
|
|
node_info.finished(istate.Events.error, error='\n'.join(failures))
|
|
_store_logs(introspection_data, node_info)
|
|
raise utils.Error(msg, node_info=node_info, data=introspection_data)
|
|
LOG.info('Matching node is %s', node_info.uuid,
|
|
node_info=node_info, data=introspection_data)
|
|
|
|
if node_info.finished_at is not None:
|
|
# race condition or introspection canceled
|
|
raise utils.Error(_('Node processing already finished with '
|
|
'error: %s') % node_info.error,
|
|
node_info=node_info, code=400)
|
|
# NOTE(TheJulia): this was previously called as a background
|
|
# process, but we can't do that with sqlite.
|
|
_store_unprocessed_data(node_info.uuid, unprocessed_data)
|
|
|
|
try:
|
|
node = node_info.node()
|
|
except ir_utils.NotFound as exc:
|
|
with excutils.save_and_reraise_exception():
|
|
node_info.finished(istate.Events.error, error=str(exc))
|
|
_store_logs(introspection_data, node_info)
|
|
|
|
try:
|
|
result = _process_node(node_info, node, introspection_data)
|
|
except utils.Error as exc:
|
|
node_info.finished(istate.Events.error, error=str(exc))
|
|
with excutils.save_and_reraise_exception():
|
|
_store_logs(introspection_data, node_info)
|
|
except Exception as exc:
|
|
LOG.exception('Unexpected exception during processing')
|
|
msg = _('Unexpected exception %(exc_class)s during processing: '
|
|
'%(error)s') % {'exc_class': exc.__class__.__name__,
|
|
'error': exc}
|
|
node_info.finished(istate.Events.error, error=msg)
|
|
_store_logs(introspection_data, node_info)
|
|
raise utils.Error(msg, node_info=node_info, data=introspection_data,
|
|
code=500)
|
|
|
|
if CONF.processing.always_store_ramdisk_logs:
|
|
_store_logs(introspection_data, node_info)
|
|
return result
|
|
|
|
|
|
def _run_post_hooks(node_info, introspection_data):
|
|
hooks = plugins_base.processing_hooks_manager()
|
|
|
|
for hook_ext in hooks:
|
|
LOG.debug('Running post-processing hook %s', hook_ext.name,
|
|
node_info=node_info, data=introspection_data)
|
|
hook_ext.obj.before_update(introspection_data, node_info)
|
|
|
|
|
|
@node_cache.fsm_transition(istate.Events.process, reentrant=False)
|
|
def _process_node(node_info, node, introspection_data):
|
|
# NOTE(dtantsur): repeat the check in case something changed
|
|
keep_power_on = ir_utils.check_provision_state(node)
|
|
|
|
_run_post_hooks(node_info, introspection_data)
|
|
store_introspection_data(node_info.uuid, introspection_data)
|
|
|
|
ironic = ir_utils.get_client()
|
|
pxe_filter.driver().sync(ironic)
|
|
|
|
node_info.invalidate_cache()
|
|
rules.apply(node_info, introspection_data)
|
|
|
|
resp = {'uuid': node.id}
|
|
|
|
# determine how to handle power
|
|
if keep_power_on or not node_info.manage_boot:
|
|
power_action = False
|
|
else:
|
|
power_action = CONF.processing.power_off
|
|
utils.executor().submit(_finish, node_info, ironic, introspection_data,
|
|
power_off=power_action)
|
|
|
|
return resp
|
|
|
|
|
|
@node_cache.triggers_fsm_error_transition()
|
|
def _finish(node_info, ironic, introspection_data, power_off=True):
|
|
if power_off:
|
|
LOG.debug('Forcing power off of node %s', node_info.uuid)
|
|
try:
|
|
ironic.set_node_power_state(node_info.uuid, 'power off')
|
|
except Exception as exc:
|
|
if node_info.node().provision_state == 'enroll':
|
|
LOG.info("Failed to power off the node in"
|
|
"'enroll' state, ignoring; error was "
|
|
"%s", exc, node_info=node_info,
|
|
data=introspection_data)
|
|
else:
|
|
msg = (_('Failed to power off node %(node)s, check '
|
|
'its power management configuration: '
|
|
'%(exc)s') % {'node': node_info.uuid, 'exc':
|
|
exc})
|
|
raise utils.Error(msg, node_info=node_info,
|
|
data=introspection_data)
|
|
LOG.info('Node powered-off', node_info=node_info,
|
|
data=introspection_data)
|
|
|
|
node_info.finished(istate.Events.finish)
|
|
LOG.info('Introspection finished successfully',
|
|
node_info=node_info, data=introspection_data)
|
|
|
|
|
|
def reapply(node_uuid, data=None):
|
|
"""Re-apply introspection steps.
|
|
|
|
Re-apply preprocessing, postprocessing and introspection rules on
|
|
stored data.
|
|
|
|
:param node_uuid: node UUID
|
|
:param data: unprocessed introspection data to be reapplied
|
|
:raises: utils.Error
|
|
"""
|
|
|
|
LOG.debug('Processing re-apply introspection request for node '
|
|
'UUID: %s', node_uuid)
|
|
node_info = node_cache.get_node(node_uuid)
|
|
if not node_info.acquire_lock(blocking=False):
|
|
# Note (mkovacik): it should be sufficient to check data
|
|
# presence & locking. If either introspection didn't start
|
|
# yet, was in waiting state or didn't finish yet, either data
|
|
# won't be available or locking would fail
|
|
raise utils.Error(_('Node locked, please, try again later'),
|
|
node_info=node_info, code=409)
|
|
|
|
utils.executor().submit(_reapply, node_info, introspection_data=data)
|
|
|
|
|
|
def _reapply(node_info, introspection_data=None):
|
|
# runs in background
|
|
node_info.started_at = timeutils.utcnow()
|
|
node_info.commit()
|
|
|
|
try:
|
|
ironic = ir_utils.get_client()
|
|
except Exception as exc:
|
|
msg = _('Encountered an exception while getting the Ironic client: '
|
|
'%s') % exc
|
|
LOG.error(msg, node_info=node_info, data=introspection_data)
|
|
node_info.finished(istate.Events.error, error=msg)
|
|
return
|
|
|
|
try:
|
|
_reapply_with_data(node_info, introspection_data)
|
|
except Exception as exc:
|
|
msg = (_('Failed reapply for node %(node)s, Error: '
|
|
'%(exc)s') % {'node': node_info.uuid, 'exc': exc})
|
|
LOG.error(msg, node_info=node_info, data=introspection_data)
|
|
return
|
|
|
|
_finish(node_info, ironic, introspection_data,
|
|
power_off=False)
|
|
|
|
LOG.info('Successfully reapplied introspection on stored '
|
|
'data', node_info=node_info, data=introspection_data)
|
|
|
|
|
|
@node_cache.fsm_event_before(istate.Events.reapply)
|
|
@node_cache.triggers_fsm_error_transition()
|
|
def _reapply_with_data(node_info, introspection_data):
|
|
failures = []
|
|
_run_pre_hooks(introspection_data, failures)
|
|
if failures:
|
|
raise utils.Error(_('Pre-processing failures detected reapplying '
|
|
'introspection on stored data:\n%s') %
|
|
'\n'.join(failures), node_info=node_info)
|
|
|
|
_run_post_hooks(node_info, introspection_data)
|
|
store_introspection_data(node_info.uuid, introspection_data)
|
|
node_info.invalidate_cache()
|
|
rules.apply(node_info, introspection_data)
|