#!/usr/bin/env python3 # Copyright 2025 Red Hat # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import argparse import collections import json import logging import socket import subprocess import threading import time import urllib from keystoneauth1 import exceptions from keystoneauth1 import loading from oslo_concurrency import lockutils from oslo_config import cfg import oslo_messaging from oslo_serialization import jsonutils LOG = logging.getLogger('cleaner') loading.register_session_conf_options(cfg.CONF, 'placement') # noqa loading.register_auth_conf_options(cfg.CONF, 'placement') # noqa loading.register_adapter_conf_options(cfg.CONF, 'placement') # noqa cfg.CONF.register_opts([cfg.StrOpt('host')]) # noqa NVMeDevice = collections.namedtuple('NVMeDevice', ['controller', 'namespaces', 'ctrl_data']) RPInventory = collections.namedtuple('RPInventory', [ 'base_path', 'resource_provider_id', 'resource_provider_generation', 'resource_class', 'total', 'max_unit']) # Command to sanitize via crypto NVME_SANITIZE_CMD_CRYPT = 0x04 # Feature flag indicating crypto sanitize support NVME_SANICAP_CRYPT = 0x01 # Feature flag inidicating block-erase sanitize support NVME_SANICAP_BLKERASE = 0x02 # Feature flag indicating overwrite sanitize support NVME_SANICAP_OVERWRITE = 0x04 # Feature flag indicating crypto format support NVME_FORMAT_CRYPT = 0x04 class CleaningFailed(Exception): pass def periodic_thread(cleaner, delay): while True: time.sleep(delay) LOG.info('Running periodic clean') try: cleaner.trigger() except Exception as e: LOG.exception('Failed to run periodic cleaning: %s', e) class CleaningEndpoint: """A notification listener endpoint that triggers cleaner on delete.""" filter_rule = oslo_messaging.NotificationFilter( event_type=r'instance\.delete\.end$') def __init__(self, cleaner, publisher_filter): self.cleaner = cleaner self.publisher_filter = publisher_filter def info(self, ctxt, publisher_id, event_type, payload, metadata): if publisher_id == self.publisher_filter: LOG.info('Instance deleted on %s', publisher_id) self.cleaner.trigger() else: LOG.debug('Ignoring notification from %s != %s', publisher_id, self.publisher_filter) audit = info debug = info warn = info error = info critical = info sample = info class Cleaner: """Base class for a cleaner for a given device type""" def match(self, pci_addr): """Determine if a PCI device matches this type""" return False def clean(self, rp, pci_addr): """Clean a specific device""" raise NotImplementedError() class GenericCleaner(Cleaner): def match(self, pci_addr): return True def clean(self, rp, pci_addr): cmd = [self.CMD, pci_addr, rp['uuid']] LOG.debug('Running %r', ' '.join(cmd)) subprocess.check_call(cmd) @classmethod def for_command(cls, cmd): class CommandCleaner(cls): CMD = cmd return CommandCleaner class NVMECleaner(Cleaner): @staticmethod def rungetjson(cmd): try: output = subprocess.check_output(cmd, shell=True) return jsonutils.loads(output) except subprocess.CalledProcessError as e: LOG.error('Failed to run %r: %s', cmd, e) raise CleaningFailed('Failed to run nvme utility') except json.JSONDecodeError as e: LOG.debug('Raw output from %r: %r', cmd, output) LOG.error('Failed to parse nvme utility output from %r: %s', cmd, e) raise CleaningFailed('Failed to parse nvme utility output') def __init__(self): data = self.rungetjson('nvme list -v -o json') self._nvmes = {} for device in data['Devices']: subs = device.get('Subsystems', [device]) for sub in subs: # v2 output has namespaces at the subsystem level subsys_ns = [ns['NameSpace'] for ns in sub.get('Namespaces', [])] for controller in sub['Controllers']: ctrl_data = self.rungetjson( 'nvme id-ctrl -o json /dev/%s' % ( controller['Controller'])) ns = [ns['NameSpace'] for ns in controller['Namespaces']] self._nvmes[controller['Address'].upper()] = NVMeDevice( controller['Controller'], subsys_ns + ns, ctrl_data) LOG.debug('Found NVMe devices: %s', self._nvmes) def match(self, pci_addr): return pci_addr.upper() in self._nvmes def _clean_with_format(self, nvme): nn = nvme.ctrl_data.get('nn', 0) if nn != 1: # If the device supports more than one namespace, there # may be some data residue outside the current set of # configured namespaces that we will not clean LOG.warning('NVMe device %s reports %i namespaces ' 'supported; cleaning may not be complete!', nvme.controller, nn) if not nvme.namespaces: LOG.error('Device %s does not report any namespaces; ' 'will not clean!') raise CleaningFailed('Device has no namespaces') cleaned = 0 for nsdev in nvme.namespaces: cmd = 'nvme format -f -s1 /dev/%s' % nsdev LOG.debug('Running %r to clean %s', cmd, nsdev) subprocess.check_call(cmd, shell=True) cleaned += 1 LOG.debug('Format complete for %s', ','.join(nvme.namespaces)) # This is a bit silly since we checked nvme.namespaces above, but # be positive about what cleaning success means so that if this code # changes in the future we won't end up with a falls-out-the-bottom # "success". # NOTE(danms): There is no guarantee that the device is fully covered # by active namespaces and this could result in data being leaked # if, for example, a user deleted some namespaces before releasing # the device without cleaning it themselves first. Sanitize is the # much more robust mechanism for this type of multi-tenant scenario! return cleaned > 0 and cleaned == len(nvme.namespaces) def _clean_with_sanitize(self, nvme): # NOTE(danms): Theoretically we could support non-crypto sanitize # here, but it seems like everything that supports sanitize is also # likely to support the crypto mechanism which is better anyway. cmd = 'nvme sanitize -a 0x%02x /dev/%s' % (NVME_FORMAT_CRYPT, nvme.controller) subprocess.check_call(cmd, shell=True) for i in range(10): time.sleep(i + 1) data = self.rungetjson( 'nvme sanitize-log -o json /dev/%s' % nvme.controller) if data[nvme.controller]['sstat'].get('global_erased') == 1: LOG.info('Sanitization complete for %s', nvme.controller) return True LOG.debug('Waiting for sanitize to complete on %s', nvme.controller) raise CleaningFailed( 'Timed out waiting for sanitize operation on %s' % ( nvme.controller)) def clean(self, rp, pci_addr): nvme = self._nvmes[pci_addr.upper()] sanicap = nvme.ctrl_data.get('sanicap', 0) if sanicap & NVME_SANICAP_CRYPT: LOG.info('Cleaning %s with crypto sanitize', nvme.controller) result = self._clean_with_sanitize(nvme) else: LOG.info('Cleaning %s with format', nvme.controller) result = self._clean_with_format(nvme) if result is not True: raise CleaningFailed('Cleaning did not positively signal success') class CleaningWatcher(): def __init__(self, session, rp_name, cleaners, dry_run=False): self.session = session self.endpoint = CleaningEndpoint(self, 'nova-compute:%s' % rp_name) self.cleaners = cleaners self.dryrun = dry_run self.interface = cfg.CONF.placement.valid_interfaces[0] ep = session.get_endpoint( session.auth, service_type='placement', interface=self.interface) self.baseurl = urllib.parse.urlparse(ep).path LOG.debug('Placement endpoint baseurl is %s', self.baseurl) rps = self._getjson('/resource_providers', params={'name': rp_name})['resource_providers'] if len(rps) != 1: raise RuntimeError('Failed to find a resource provider for %r' % ( rp_name)) self.rp_id = rps[0]['uuid'] LOG.debug('Found uuid %s for resource provider %r', self.rp_id, rp_name) def link_for_rp(self, rp, linktype): """Return the URL for a named link relation""" for link in rp['links']: if link['rel'] == linktype: # This link will have the full path as used to access it, # but keystoneauth will re-add the baseurl (i.e. /placement # to it) return link['href'].removeprefix(self.baseurl) def _getjson(self, url, **kwargs): return self.session.get( url, microversion='1.14', endpoint_filter={'service_type': 'placement', 'interface': self.interface}, **kwargs).json() def _putjson(self, url, data): return self.session.put( url, json=data, microversion='1.14', endpoint_filter={'service_type': 'placement', 'interface': self.interface}) def traits_for_rp(self, rp): # NOTE(danms): openstacksdk has no support for resource provider # traits, so query directly here. url = self.link_for_rp(rp, 'traits') return self._getjson(url)['traits'] def allocations_for_rp(self, rp): # NOTE(danms): openstacksdk has no support for resource provider # allocations, so query directly here. url = self.link_for_rp(rp, 'allocations') return self._getjson(url)['allocations'] def inventories_for_rp(self, rp): url = self.link_for_rp(rp, 'inventories') return self._getjson(url) def rp_inventory_to_clean(self, rp): rpinv = self.inventories_for_rp(rp) inventory = rpinv['inventories'] generation = rpinv['resource_provider_generation'] if len(inventory) != 1: # This has OTU traitage, but doesn't look like a PCI-in-placement # provider, so skip it for safety. LOG.warning( 'Skipping provider %s because it has %i inventories', rp['uuid'], len(inventory)) return # OTU devices are "burned" when they have reserved == total rc = list(inventory.keys())[0] is_reserved = inventory[rc]['reserved'] == inventory[rc]['total'] allocs = self.allocations_for_rp(rp) LOG.debug('Allocations for %s: %s', rp['uuid'], allocs) LOG.debug('Inventory for %s: %s', rp['uuid'], inventory) # A provider needs cleaning if it is reserved and has no allocations if is_reserved and not allocs: rpinv = RPInventory(self.link_for_rp(rp, 'inventories'), rp['uuid'], generation, rc, inventory[rc]['total'], inventory[rc]['max_unit']) return rpinv def pci_addr_for_rp(self, rp): # This format is defined in the PCI-in-placement work as: # hostname_DDDD:BB:SS.FF host, addr = rp['name'].split('_') return addr def unreserve(self, inventory): LOG.info('Unreserving %s on %s', inventory.resource_class, inventory.resource_provider_id) # NOTE(danms): openstacksdk apparently can't change *just* reserved # for a provider inventory. Placement requires total be in the payload, # which the sdk will not include if it thinks it is unchanged. So, # manually PUT the inventory here. url = '/'.join([ inventory.base_path % { 'resource_provider_id': inventory.resource_provider_id}, inventory.resource_class]) inventory = { 'resource_provider_generation': ( inventory.resource_provider_generation), 'reserved': 0, 'total': inventory.total, 'max_unit': inventory.max_unit, } r = self._putjson(url, inventory) r.raise_for_status() def run_cleaner(self, rp, pci_addr, cleaner, inventory): if self.dryrun: LOG.info('Would use %s to clean %s %s', cleaner.__class__.__name__, inventory.resource_provider_id, inventory.resource_class) return try: cleaner.clean(rp, pci_addr) self.unreserve(inventory) except subprocess.CalledProcessError as e: LOG.error('Failed to clean %s (%s) - action returned %i', pci_addr, rp['uuid'], e.returncode) except exceptions.HttpError as e: LOG.error('Failed to unreserve %s: %s', rp['uuid'], e) except Exception as e: LOG.exception('Unexpected error cleaning %s (%s): %s', pci_addr, rp['uuid'], e) @lockutils.synchronized('cleaner') def trigger(self): children = self._getjson( '/resource_providers', params={'in_tree': self.rp_id})['resource_providers'] cleaners = [cleaner_cls() for cleaner_cls in self.cleaners] for child in children: is_otu = 'HW_PCI_ONE_TIME_USE' in self.traits_for_rp(child) if not is_otu: continue inventory_to_clean = self.rp_inventory_to_clean(child) needs_cleaning = is_otu and inventory_to_clean LOG.debug('Provider %s needs cleaning: %s (otu=%s)', child['uuid'], needs_cleaning, is_otu) if not needs_cleaning: continue pci_addr = self.pci_addr_for_rp(child) matched_cleaners = [cleaner for cleaner in cleaners if cleaner.match(pci_addr)] for cleaner in matched_cleaners: self.run_cleaner(child, pci_addr, cleaner, inventory_to_clean) if not matched_cleaners and needs_cleaning: # Unsupported device type, don't clean LOG.warning('Child provider %s for %s needs cleaning but no ' 'cleaning implementations matched!', child['uuid'], pci_addr) class LoggerEndpoint: """A notification endpoint that just logs each event received for debug""" def info(self, ctxt, publisher_id, event_type, payload, metadata): LOG.info('Event: %s %s', publisher_id, event_type) def main(): p = argparse.ArgumentParser() p.add_argument('--topic', default=None, help=('Notification queue. Defaults to ' 'notifications-$provider_name for recommended ' 'per-compute topic configuration')) p.add_argument('--debug', action='store_true', help='Enable verbose debug logging') p.add_argument('--provider-name', default=None, help=('Parent provider name for this compute node ' '(if unspecified, attempt to detect)')) p.add_argument('--one', action='store_true', help='Run one cleaning pass and exit') p.add_argument('--dry-run', action='store_true', help='Do not actually clean, just print what would be done') p.add_argument('--pool', default=None, help=('oslo.messaging receiver pool name (see caveats ' 'about this usage)')) p.add_argument('--periodic', type=int, default=0, help='Scan for missed required cleaning every N minutes') p.add_argument('--nvme', action='store_true', help=('Clean NVMe devices with `nvme format` on all ' 'namespaces')) p.add_argument('--generic-cmd', help=('Run this command for each PCI device needing to be ' 'cleaned. Arguments are "pci_addr rp_uuid"')) args = p.parse_args() cfg.CONF([], project='nova') auth = loading.load_auth_from_conf_options(cfg.CONF, 'placement') sess = loading.load_session_from_conf_options(cfg.CONF, 'placement', auth=auth) logging.basicConfig(level=args.debug and logging.DEBUG or logging.INFO) if not args.provider_name: if cfg.CONF.host: args.provider_name = cfg.CONF.host else: args.provider_name = socket.gethostname().split('.')[0] if not args.topic: args.topic = 'notifications-%s' % args.provider_name # This could be dynamic based on arguments to control what things we do cleaners = [] if args.nvme: cleaners.append(NVMECleaner) if args.generic_cmd: cleaners.append(GenericCleaner.for_command(args.generic_cmd)) if len(cleaners) > 1: # NOTE(danms): This should be supportable, but it complicates our # "when to unreserve" logic, so just keep this simple at the moment. print('Only one type of cleaner may be enabled at a time') return 1 c = CleaningWatcher(sess, args.provider_name, cleaners, dry_run=args.dry_run) # Trigger at least once at startup before we potentially start processing # notifications c.trigger() if args.one: return if args.periodic: periodic = threading.Thread(target=periodic_thread, args=(c, args.periodic * 60)) periodic.daemon = True periodic.start() transport = oslo_messaging.get_notification_transport(cfg.CONF) targets = [oslo_messaging.Target(topic=args.topic, exchange='nova')] endpoints = [c.endpoint] if args.debug: endpoints.insert(0, LoggerEndpoint()) server = oslo_messaging.get_notification_listener(transport, targets, endpoints, pool=args.pool) server.start() try: server.wait() except KeyboardInterrupt: pass if __name__ == '__main__': main()