Change-Id: I2eb20d81f45dec8b2b569823c645f906acdb8f79 Signed-off-by: Amit Uniyal <auniyal@redhat.com>
490 lines
19 KiB
Python
Executable File
490 lines
19 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# Copyright 2025 Red Hat
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import argparse
|
|
import collections
|
|
import json
|
|
import logging
|
|
import socket
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
import urllib
|
|
|
|
from keystoneauth1 import exceptions
|
|
from keystoneauth1 import loading
|
|
from oslo_concurrency import lockutils
|
|
from oslo_config import cfg
|
|
import oslo_messaging
|
|
from oslo_serialization import jsonutils
|
|
|
|
LOG = logging.getLogger('cleaner')
|
|
loading.register_session_conf_options(cfg.CONF, 'placement') # noqa
|
|
loading.register_auth_conf_options(cfg.CONF, 'placement') # noqa
|
|
loading.register_adapter_conf_options(cfg.CONF, 'placement') # noqa
|
|
cfg.CONF.register_opts([cfg.StrOpt('host')]) # noqa
|
|
NVMeDevice = collections.namedtuple('NVMeDevice',
|
|
['controller', 'namespaces', 'ctrl_data'])
|
|
RPInventory = collections.namedtuple('RPInventory', [
|
|
'base_path', 'resource_provider_id', 'resource_provider_generation',
|
|
'resource_class', 'total', 'max_unit'])
|
|
|
|
# Command to sanitize via crypto
|
|
NVME_SANITIZE_CMD_CRYPT = 0x04
|
|
|
|
# Feature flag indicating crypto sanitize support
|
|
NVME_SANICAP_CRYPT = 0x01
|
|
# Feature flag inidicating block-erase sanitize support
|
|
NVME_SANICAP_BLKERASE = 0x02
|
|
# Feature flag indicating overwrite sanitize support
|
|
NVME_SANICAP_OVERWRITE = 0x04
|
|
|
|
# Feature flag indicating crypto format support
|
|
NVME_FORMAT_CRYPT = 0x04
|
|
|
|
|
|
class CleaningFailed(Exception):
|
|
pass
|
|
|
|
|
|
def periodic_thread(cleaner, delay):
|
|
while True:
|
|
time.sleep(delay)
|
|
LOG.info('Running periodic clean')
|
|
try:
|
|
cleaner.trigger()
|
|
except Exception as e:
|
|
LOG.exception('Failed to run periodic cleaning: %s', e)
|
|
|
|
|
|
class CleaningEndpoint:
|
|
"""A notification listener endpoint that triggers cleaner on delete."""
|
|
filter_rule = oslo_messaging.NotificationFilter(
|
|
event_type=r'instance\.delete\.end$')
|
|
|
|
def __init__(self, cleaner, publisher_filter):
|
|
self.cleaner = cleaner
|
|
self.publisher_filter = publisher_filter
|
|
|
|
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
|
if publisher_id == self.publisher_filter:
|
|
LOG.info('Instance deleted on %s', publisher_id)
|
|
self.cleaner.trigger()
|
|
else:
|
|
LOG.debug('Ignoring notification from %s != %s',
|
|
publisher_id, self.publisher_filter)
|
|
|
|
audit = info
|
|
debug = info
|
|
warn = info
|
|
error = info
|
|
critical = info
|
|
sample = info
|
|
|
|
|
|
class Cleaner:
|
|
"""Base class for a cleaner for a given device type"""
|
|
|
|
def match(self, pci_addr):
|
|
"""Determine if a PCI device matches this type"""
|
|
return False
|
|
|
|
def clean(self, rp, pci_addr):
|
|
"""Clean a specific device"""
|
|
raise NotImplementedError()
|
|
|
|
|
|
class GenericCleaner(Cleaner):
|
|
def match(self, pci_addr):
|
|
return True
|
|
|
|
def clean(self, rp, pci_addr):
|
|
cmd = [self.CMD, pci_addr, rp['uuid']]
|
|
LOG.debug('Running %r', ' '.join(cmd))
|
|
subprocess.check_call(cmd)
|
|
|
|
@classmethod
|
|
def for_command(cls, cmd):
|
|
class CommandCleaner(cls):
|
|
CMD = cmd
|
|
|
|
return CommandCleaner
|
|
|
|
|
|
class NVMECleaner(Cleaner):
|
|
@staticmethod
|
|
def rungetjson(cmd):
|
|
try:
|
|
output = subprocess.check_output(cmd, shell=True)
|
|
return jsonutils.loads(output)
|
|
except subprocess.CalledProcessError as e:
|
|
LOG.error('Failed to run %r: %s', cmd, e)
|
|
raise CleaningFailed('Failed to run nvme utility')
|
|
except json.JSONDecodeError as e:
|
|
LOG.debug('Raw output from %r: %r', cmd, output)
|
|
LOG.error('Failed to parse nvme utility output from %r: %s',
|
|
cmd, e)
|
|
raise CleaningFailed('Failed to parse nvme utility output')
|
|
|
|
def __init__(self):
|
|
data = self.rungetjson('nvme list -v -o json')
|
|
self._nvmes = {}
|
|
for device in data['Devices']:
|
|
subs = device.get('Subsystems', [device])
|
|
for sub in subs:
|
|
# v2 output has namespaces at the subsystem level
|
|
subsys_ns = [ns['NameSpace']
|
|
for ns in sub.get('Namespaces', [])]
|
|
for controller in sub['Controllers']:
|
|
ctrl_data = self.rungetjson(
|
|
'nvme id-ctrl -o json /dev/%s' % (
|
|
controller['Controller']))
|
|
ns = [ns['NameSpace'] for ns in controller['Namespaces']]
|
|
self._nvmes[controller['Address'].upper()] = NVMeDevice(
|
|
controller['Controller'], subsys_ns + ns, ctrl_data)
|
|
LOG.debug('Found NVMe devices: %s', self._nvmes)
|
|
|
|
def match(self, pci_addr):
|
|
return pci_addr.upper() in self._nvmes
|
|
|
|
def _clean_with_format(self, nvme):
|
|
nn = nvme.ctrl_data.get('nn', 0)
|
|
if nn != 1:
|
|
# If the device supports more than one namespace, there
|
|
# may be some data residue outside the current set of
|
|
# configured namespaces that we will not clean
|
|
LOG.warning('NVMe device %s reports %i namespaces '
|
|
'supported; cleaning may not be complete!',
|
|
nvme.controller, nn)
|
|
|
|
if not nvme.namespaces:
|
|
LOG.error('Device %s does not report any namespaces; '
|
|
'will not clean!')
|
|
raise CleaningFailed('Device has no namespaces')
|
|
|
|
cleaned = 0
|
|
for nsdev in nvme.namespaces:
|
|
cmd = 'nvme format -f -s1 /dev/%s' % nsdev
|
|
LOG.debug('Running %r to clean %s', cmd, nsdev)
|
|
subprocess.check_call(cmd, shell=True)
|
|
cleaned += 1
|
|
LOG.debug('Format complete for %s', ','.join(nvme.namespaces))
|
|
|
|
# This is a bit silly since we checked nvme.namespaces above, but
|
|
# be positive about what cleaning success means so that if this code
|
|
# changes in the future we won't end up with a falls-out-the-bottom
|
|
# "success".
|
|
# NOTE(danms): There is no guarantee that the device is fully covered
|
|
# by active namespaces and this could result in data being leaked
|
|
# if, for example, a user deleted some namespaces before releasing
|
|
# the device without cleaning it themselves first. Sanitize is the
|
|
# much more robust mechanism for this type of multi-tenant scenario!
|
|
return cleaned > 0 and cleaned == len(nvme.namespaces)
|
|
|
|
def _clean_with_sanitize(self, nvme):
|
|
# NOTE(danms): Theoretically we could support non-crypto sanitize
|
|
# here, but it seems like everything that supports sanitize is also
|
|
# likely to support the crypto mechanism which is better anyway.
|
|
cmd = 'nvme sanitize -a 0x%02x /dev/%s' % (NVME_FORMAT_CRYPT,
|
|
nvme.controller)
|
|
subprocess.check_call(cmd, shell=True)
|
|
for i in range(10):
|
|
time.sleep(i + 1)
|
|
data = self.rungetjson(
|
|
'nvme sanitize-log -o json /dev/%s' % nvme.controller)
|
|
if data[nvme.controller]['sstat'].get('global_erased') == 1:
|
|
LOG.info('Sanitization complete for %s', nvme.controller)
|
|
return True
|
|
LOG.debug('Waiting for sanitize to complete on %s',
|
|
nvme.controller)
|
|
|
|
raise CleaningFailed(
|
|
'Timed out waiting for sanitize operation on %s' % (
|
|
nvme.controller))
|
|
|
|
def clean(self, rp, pci_addr):
|
|
nvme = self._nvmes[pci_addr.upper()]
|
|
sanicap = nvme.ctrl_data.get('sanicap', 0)
|
|
if sanicap & NVME_SANICAP_CRYPT:
|
|
LOG.info('Cleaning %s with crypto sanitize', nvme.controller)
|
|
result = self._clean_with_sanitize(nvme)
|
|
else:
|
|
LOG.info('Cleaning %s with format', nvme.controller)
|
|
result = self._clean_with_format(nvme)
|
|
|
|
if result is not True:
|
|
raise CleaningFailed('Cleaning did not positively signal success')
|
|
|
|
|
|
class CleaningWatcher():
|
|
def __init__(self, session, rp_name, cleaners, dry_run=False):
|
|
self.session = session
|
|
self.endpoint = CleaningEndpoint(self, 'nova-compute:%s' % rp_name)
|
|
self.cleaners = cleaners
|
|
self.dryrun = dry_run
|
|
self.interface = cfg.CONF.placement.valid_interfaces[0]
|
|
|
|
ep = session.get_endpoint(
|
|
session.auth,
|
|
service_type='placement',
|
|
interface=self.interface)
|
|
self.baseurl = urllib.parse.urlparse(ep).path
|
|
LOG.debug('Placement endpoint baseurl is %s', self.baseurl)
|
|
|
|
rps = self._getjson('/resource_providers',
|
|
params={'name': rp_name})['resource_providers']
|
|
if len(rps) != 1:
|
|
raise RuntimeError('Failed to find a resource provider for %r' % (
|
|
rp_name))
|
|
self.rp_id = rps[0]['uuid']
|
|
LOG.debug('Found uuid %s for resource provider %r',
|
|
self.rp_id, rp_name)
|
|
|
|
def link_for_rp(self, rp, linktype):
|
|
"""Return the URL for a named link relation"""
|
|
for link in rp['links']:
|
|
if link['rel'] == linktype:
|
|
# This link will have the full path as used to access it,
|
|
# but keystoneauth will re-add the baseurl (i.e. /placement
|
|
# to it)
|
|
return link['href'].removeprefix(self.baseurl)
|
|
|
|
def _getjson(self, url, **kwargs):
|
|
return self.session.get(
|
|
url, microversion='1.14',
|
|
endpoint_filter={'service_type': 'placement',
|
|
'interface': self.interface},
|
|
**kwargs).json()
|
|
|
|
def _putjson(self, url, data):
|
|
return self.session.put(
|
|
url, json=data, microversion='1.14',
|
|
endpoint_filter={'service_type': 'placement',
|
|
'interface': self.interface})
|
|
|
|
def traits_for_rp(self, rp):
|
|
# NOTE(danms): openstacksdk has no support for resource provider
|
|
# traits, so query directly here.
|
|
url = self.link_for_rp(rp, 'traits')
|
|
return self._getjson(url)['traits']
|
|
|
|
def allocations_for_rp(self, rp):
|
|
# NOTE(danms): openstacksdk has no support for resource provider
|
|
# allocations, so query directly here.
|
|
url = self.link_for_rp(rp, 'allocations')
|
|
return self._getjson(url)['allocations']
|
|
|
|
def inventories_for_rp(self, rp):
|
|
url = self.link_for_rp(rp, 'inventories')
|
|
return self._getjson(url)
|
|
|
|
def rp_inventory_to_clean(self, rp):
|
|
rpinv = self.inventories_for_rp(rp)
|
|
inventory = rpinv['inventories']
|
|
generation = rpinv['resource_provider_generation']
|
|
if len(inventory) != 1:
|
|
# This has OTU traitage, but doesn't look like a PCI-in-placement
|
|
# provider, so skip it for safety.
|
|
LOG.warning(
|
|
'Skipping provider %s because it has %i inventories',
|
|
rp['uuid'], len(inventory))
|
|
return
|
|
# OTU devices are "burned" when they have reserved == total
|
|
rc = list(inventory.keys())[0]
|
|
is_reserved = inventory[rc]['reserved'] == inventory[rc]['total']
|
|
allocs = self.allocations_for_rp(rp)
|
|
LOG.debug('Allocations for %s: %s', rp['uuid'], allocs)
|
|
LOG.debug('Inventory for %s: %s', rp['uuid'], inventory)
|
|
# A provider needs cleaning if it is reserved and has no allocations
|
|
if is_reserved and not allocs:
|
|
rpinv = RPInventory(self.link_for_rp(rp, 'inventories'),
|
|
rp['uuid'],
|
|
generation, rc,
|
|
inventory[rc]['total'],
|
|
inventory[rc]['max_unit'])
|
|
return rpinv
|
|
|
|
def pci_addr_for_rp(self, rp):
|
|
# This format is defined in the PCI-in-placement work as:
|
|
# hostname_DDDD:BB:SS.FF
|
|
host, addr = rp['name'].split('_')
|
|
return addr
|
|
|
|
def unreserve(self, inventory):
|
|
LOG.info('Unreserving %s on %s', inventory.resource_class,
|
|
inventory.resource_provider_id)
|
|
# NOTE(danms): openstacksdk apparently can't change *just* reserved
|
|
# for a provider inventory. Placement requires total be in the payload,
|
|
# which the sdk will not include if it thinks it is unchanged. So,
|
|
# manually PUT the inventory here.
|
|
url = '/'.join([
|
|
inventory.base_path % {
|
|
'resource_provider_id': inventory.resource_provider_id},
|
|
inventory.resource_class])
|
|
inventory = {
|
|
'resource_provider_generation': (
|
|
inventory.resource_provider_generation),
|
|
'reserved': 0,
|
|
'total': inventory.total,
|
|
'max_unit': inventory.max_unit,
|
|
}
|
|
|
|
r = self._putjson(url, inventory)
|
|
r.raise_for_status()
|
|
|
|
def run_cleaner(self, rp, pci_addr, cleaner, inventory):
|
|
if self.dryrun:
|
|
LOG.info('Would use %s to clean %s %s',
|
|
cleaner.__class__.__name__,
|
|
inventory.resource_provider_id,
|
|
inventory.resource_class)
|
|
return
|
|
try:
|
|
cleaner.clean(rp, pci_addr)
|
|
self.unreserve(inventory)
|
|
except subprocess.CalledProcessError as e:
|
|
LOG.error('Failed to clean %s (%s) - action returned %i',
|
|
pci_addr, rp['uuid'], e.returncode)
|
|
except exceptions.HttpError as e:
|
|
LOG.error('Failed to unreserve %s: %s', rp['uuid'], e)
|
|
except Exception as e:
|
|
LOG.exception('Unexpected error cleaning %s (%s): %s',
|
|
pci_addr, rp['uuid'], e)
|
|
|
|
@lockutils.synchronized('cleaner')
|
|
def trigger(self):
|
|
children = self._getjson(
|
|
'/resource_providers',
|
|
params={'in_tree': self.rp_id})['resource_providers']
|
|
cleaners = [cleaner_cls() for cleaner_cls in self.cleaners]
|
|
for child in children:
|
|
is_otu = 'HW_PCI_ONE_TIME_USE' in self.traits_for_rp(child)
|
|
if not is_otu:
|
|
continue
|
|
inventory_to_clean = self.rp_inventory_to_clean(child)
|
|
needs_cleaning = is_otu and inventory_to_clean
|
|
LOG.debug('Provider %s needs cleaning: %s (otu=%s)',
|
|
child['uuid'], needs_cleaning, is_otu)
|
|
if not needs_cleaning:
|
|
continue
|
|
pci_addr = self.pci_addr_for_rp(child)
|
|
matched_cleaners = [cleaner for cleaner in cleaners
|
|
if cleaner.match(pci_addr)]
|
|
for cleaner in matched_cleaners:
|
|
self.run_cleaner(child, pci_addr, cleaner, inventory_to_clean)
|
|
if not matched_cleaners and needs_cleaning:
|
|
# Unsupported device type, don't clean
|
|
LOG.warning('Child provider %s for %s needs cleaning but no '
|
|
'cleaning implementations matched!',
|
|
child['uuid'], pci_addr)
|
|
|
|
|
|
class LoggerEndpoint:
|
|
"""A notification endpoint that just logs each event received for debug"""
|
|
|
|
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
|
LOG.info('Event: %s %s', publisher_id, event_type)
|
|
|
|
|
|
def main():
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument('--topic', default=None,
|
|
help=('Notification queue. Defaults to '
|
|
'notifications-$provider_name for recommended '
|
|
'per-compute topic configuration'))
|
|
p.add_argument('--debug', action='store_true',
|
|
help='Enable verbose debug logging')
|
|
p.add_argument('--provider-name', default=None,
|
|
help=('Parent provider name for this compute node '
|
|
'(if unspecified, attempt to detect)'))
|
|
p.add_argument('--one', action='store_true',
|
|
help='Run one cleaning pass and exit')
|
|
p.add_argument('--dry-run', action='store_true',
|
|
help='Do not actually clean, just print what would be done')
|
|
p.add_argument('--pool', default=None,
|
|
help=('oslo.messaging receiver pool name (see caveats '
|
|
'about this usage)'))
|
|
p.add_argument('--periodic', type=int, default=0,
|
|
help='Scan for missed required cleaning every N minutes')
|
|
p.add_argument('--nvme', action='store_true',
|
|
help=('Clean NVMe devices with `nvme format` on all '
|
|
'namespaces'))
|
|
p.add_argument('--generic-cmd',
|
|
help=('Run this command for each PCI device needing to be '
|
|
'cleaned. Arguments are "pci_addr rp_uuid"'))
|
|
args = p.parse_args()
|
|
cfg.CONF([], project='nova')
|
|
|
|
auth = loading.load_auth_from_conf_options(cfg.CONF, 'placement')
|
|
sess = loading.load_session_from_conf_options(cfg.CONF, 'placement',
|
|
auth=auth)
|
|
|
|
logging.basicConfig(level=args.debug and logging.DEBUG or logging.INFO)
|
|
|
|
if not args.provider_name:
|
|
if cfg.CONF.host:
|
|
args.provider_name = cfg.CONF.host
|
|
else:
|
|
args.provider_name = socket.gethostname().split('.')[0]
|
|
if not args.topic:
|
|
args.topic = 'notifications-%s' % args.provider_name
|
|
|
|
# This could be dynamic based on arguments to control what things we do
|
|
cleaners = []
|
|
if args.nvme:
|
|
cleaners.append(NVMECleaner)
|
|
if args.generic_cmd:
|
|
cleaners.append(GenericCleaner.for_command(args.generic_cmd))
|
|
if len(cleaners) > 1:
|
|
# NOTE(danms): This should be supportable, but it complicates our
|
|
# "when to unreserve" logic, so just keep this simple at the moment.
|
|
print('Only one type of cleaner may be enabled at a time')
|
|
return 1
|
|
c = CleaningWatcher(sess, args.provider_name, cleaners,
|
|
dry_run=args.dry_run)
|
|
# Trigger at least once at startup before we potentially start processing
|
|
# notifications
|
|
c.trigger()
|
|
|
|
if args.one:
|
|
return
|
|
|
|
if args.periodic:
|
|
periodic = threading.Thread(target=periodic_thread,
|
|
args=(c, args.periodic * 60))
|
|
periodic.daemon = True
|
|
periodic.start()
|
|
|
|
transport = oslo_messaging.get_notification_transport(cfg.CONF)
|
|
targets = [oslo_messaging.Target(topic=args.topic, exchange='nova')]
|
|
endpoints = [c.endpoint]
|
|
if args.debug:
|
|
endpoints.insert(0, LoggerEndpoint())
|
|
|
|
server = oslo_messaging.get_notification_listener(transport, targets,
|
|
endpoints,
|
|
pool=args.pool)
|
|
server.start()
|
|
try:
|
|
server.wait()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|