From 28331fee3cff7ac3ccbeb96c22556a5192642309 Mon Sep 17 00:00:00 2001 From: Dan Smith Date: Tue, 29 Apr 2025 06:55:52 -0700 Subject: [PATCH] Example NVMe cleaning script for one-time-use This is an example NVMe cleaner script that can listen to notifications from a per-compute topic and perform basic NVMe (or generic command) cleaning as a result. It also supports periodic cleaning, notification pooling (if desired, but not recommended), and single-shot (i.e. cron) mode. I ran this while hammering my devstack node in a tight loop of create/ delete with a PCI device. The creates would succeed and fail as the cleaning took a few seconds per iteration, then recover, create again, delete, clean, etc. Dozens of loops in a row with expected results. Related to blueprint one-time-use-devices Change-Id: Ie4e81d6319499a3bf12b60e0af4cdb291fca9503 --- contrib/clean-on-delete.py | 355 +++++++++++++++++++++++++++++++++++++ doc/dictionary.txt | 1 + 2 files changed, 356 insertions(+) create mode 100755 contrib/clean-on-delete.py diff --git a/contrib/clean-on-delete.py b/contrib/clean-on-delete.py new file mode 100755 index 000000000000..2a097b18b572 --- /dev/null +++ b/contrib/clean-on-delete.py @@ -0,0 +1,355 @@ +#!/usr/bin/env python3 +# Copyright 2025 Red Hat +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import argparse +import collections +import logging +import socket +import subprocess +import threading +import time + +from keystoneauth1 import exceptions +from keystoneauth1 import loading +from openstack import connection +from oslo_concurrency import lockutils +from oslo_config import cfg +import oslo_messaging +from oslo_serialization import jsonutils + +LOG = logging.getLogger('cleaner') +loading.register_session_conf_options(cfg.CONF, 'placement') # noqa +loading.register_auth_conf_options(cfg.CONF, 'placement') # noqa +loading.register_adapter_conf_options(cfg.CONF, 'placement') # noqa +cfg.CONF.register_opts([cfg.StrOpt('host')]) # noqa + + +def link_for_rp(rp, linktype): + """Return the URL for a named link relation""" + for link in rp.links: + if link['rel'] == linktype: + # This link will have the full path as used to access it, + # but keystoneauth will re-add the baseurl (i.e. /placement + # to it) + return link['href'].split('/', 2)[2] + + +def periodic_thread(cleaner, delay): + while True: + time.sleep(delay) + LOG.info('Running periodic clean') + try: + cleaner.trigger() + except Exception as e: + LOG.exception('Failed to run periodic cleaning: %s', e) + + +class CleaningEndpoint: + """A notification listener endpoint that triggers cleaner on delete.""" + filter_rule = oslo_messaging.NotificationFilter( + event_type=r'instance\.delete\.end$') + + def __init__(self, cleaner, publisher_filter): + self.cleaner = cleaner + self.publisher_filter = publisher_filter + + def info(self, ctxt, publisher_id, event_type, payload, metadata): + if publisher_id == self.publisher_filter: + LOG.info('Instance deleted on %s', publisher_id) + self.cleaner.trigger() + else: + LOG.debug('Ignoring notification from %s != %s', + publisher_id, self.publisher_filter) + + audit = info + debug = info + warn = info + error = info + critical = info + sample = info + + +class Cleaner: + """Base class for a cleaner for a given device type""" + + def match(self, pci_addr): + """Determine if a PCI device matches this type""" + return False + + def clean(self, rp, pci_addr): + """Clean a specific device""" + raise NotImplementedError() + + +class GenericCleaner(Cleaner): + def match(self, pci_addr): + return True + + def clean(self, rp, pci_addr): + cmd = [self.CMD, pci_addr, rp.id] + LOG.debug('Running %r', ' '.join(cmd)) + subprocess.check_call(cmd) + + @classmethod + def for_command(cls, cmd): + class CommandCleaner(cls): + CMD = cmd + + return CommandCleaner + + +class NVMECleaner(Cleaner): + def __init__(self): + output = subprocess.check_output('nvme list -v -o json', shell=True) + data = jsonutils.loads(output) + self._nvmes = collections.defaultdict(list) + for device in data['Devices']: + for controller in device['Controllers']: + for ns in controller['Namespaces']: + self._nvmes[controller['Address']].append(ns['NameSpace']) + LOG.debug('Found NVMe devices: %s', self._nvmes) + + def match(self, pci_addr): + return pci_addr in self._nvmes + + def clean(self, rp, pci_addr): + devices = self._nvmes[pci_addr] + # NOTE(danms): This may not be good enough (we may need to use the + # "sanitize" option), or we may need to provide more options based + # on what the NVMe supports. + LOG.info('Cleaning %s', ','.join(devices)) + for nsdev in devices: + cmd = 'nvme format -f -s1 /dev/%s' % nsdev + LOG.debug('Running %r to clean %s (%s %s)', cmd, nsdev, + rp.id, pci_addr) + subprocess.check_call(cmd, shell=True) + + +class CleaningWatcher(): + def __init__(self, client, rp_name, cleaners, dry_run=False): + self.client = client + self.endpoint = CleaningEndpoint(self, 'nova-compute:%s' % rp_name) + self.rp = client.find_resource_provider(rp_name) + self.cleaners = cleaners + self.dryrun = dry_run + + def _getjson(self, url): + return self.client.session.get( + url, microversion='1.6', + endpoint_filter={'service_type': 'placement'}).json() + + def _putjson(self, url, data): + return self.client.session.put( + url, json=data, microversion='1.6', + endpoint_filter={'service_type': 'placement'}) + + def traits_for_rp(self, rp): + # NOTE(danms): openstacksdk has no support for resource provider + # traits, so query directly here. + url = link_for_rp(rp, 'traits') + return self._getjson(url)['traits'] + + def allocations_for_rp(self, rp): + # NOTE(danms): openstacksdk has no support for resource provider + # allocations, so query directly here. + url = link_for_rp(rp, 'allocations') + return self._getjson(url)['allocations'] + + def rp_inventory_to_clean(self, rp): + inventory = list(self.client.resource_provider_inventories(rp)) + if len(inventory) != 1: + # This has OTU traitage, but doesn't look like a PCI-in-placement + # provider, so skip it for safety. + LOG.warning( + 'Skipping provider %s because it has %i inventories', + rp.id, len(inventory)) + return + # OTU devices are "burned" when they have reserved == total + is_reserved = inventory[0].reserved == inventory[0].total + allocs = self.allocations_for_rp(rp) + LOG.debug('Allocations for %s: %s', rp.id, allocs) + LOG.debug('Inventory for %s: %s', rp.id, inventory) + # A provider needs cleaning if it is reserved and has no allocations + if is_reserved and not allocs: + return inventory[0] + + def pci_addr_for_rp(self, rp): + # This format is defined in the PCI-in-placement work as: + # hostname_DDDD:BB:SS.FF + host, addr = rp.name.split('_') + return addr + + def unreserve(self, inventory): + LOG.info('Unreserving %s on %s', inventory.resource_class, + inventory.resource_provider_id) + # NOTE(danms): openstacksdk apparently can't change *just* reserved + # for a provider inventory. Placement requires total be in the payload, + # which the sdk will not include if it thinks it is unchanged. So, + # manually PUT the inventory here. + url = '/'.join([ + inventory.base_path % { + 'resource_provider_id': inventory.resource_provider_id}, + inventory.resource_class]) + inventory = { + 'resource_provider_generation': ( + inventory.resource_provider_generation), + 'reserved': 0, + 'total': inventory.total, + } + + r = self._putjson(url, inventory) + r.raise_for_status() + + def run_cleaner(self, rp, pci_addr, cleaner, inventory): + if self.dryrun: + LOG.info('Would use %s to clean %s %s', + cleaner.__class__.__name__, + inventory.resource_provider_id, + inventory.resource_class) + return + try: + cleaner.clean(rp, pci_addr) + self.unreserve(inventory) + except subprocess.CalledProcessError as e: + LOG.error('Failed to clean %s (%s) - action returned %i', + pci_addr, rp.id, e.returncode) + except exceptions.HttpError as e: + LOG.error('Failed to unreserve %s: %s', rp.id, e) + except Exception as e: + LOG.exception('Unexpected error cleaning %s (%s): %s', + pci_addr, rp.id, e) + + @lockutils.synchronized('cleaner') + def trigger(self): + children = self.client.resource_providers(in_tree=self.rp.id) + children = list(children) + cleaners = [cleaner_cls() for cleaner_cls in self.cleaners] + for child in children: + is_otu = 'HW_PCI_ONE_TIME_USE' in self.traits_for_rp(child) + inventory_to_clean = self.rp_inventory_to_clean(child) + needs_cleaning = is_otu and inventory_to_clean + LOG.debug('Provider %s needs cleaning: %s (otu=%s)', + child.id, needs_cleaning, is_otu) + if not needs_cleaning: + continue + pci_addr = self.pci_addr_for_rp(child) + matched_cleaners = [cleaner for cleaner in cleaners + if cleaner.match(pci_addr)] + for cleaner in matched_cleaners: + self.run_cleaner(child, pci_addr, cleaner, inventory_to_clean) + if not matched_cleaners and needs_cleaning: + # Unsupported device type, don't clean + LOG.warning('Child provider %s for %s needs cleaning but no ' + 'cleaning implementations matched!', + child.id, pci_addr) + + +class LoggerEndpoint: + """A notification endpoint that just logs each event received for debug""" + + def info(self, ctxt, publisher_id, event_type, payload, metadata): + LOG.info('Event: %s %s', publisher_id, event_type) + + +def main(): + p = argparse.ArgumentParser() + p.add_argument('--topic', default=None, + help=('Notification queue. Defaults to ' + 'notifications-$provider_name for recommended ' + 'per-compute topic configuration')) + p.add_argument('--debug', action='store_true', + help='Enable verbose debug logging') + p.add_argument('--provider-name', default=None, + help=('Parent provider name for this compute node ' + '(if unspecified, attempt to detect)')) + p.add_argument('--one', action='store_true', + help='Run one cleaning pass and exit') + p.add_argument('--dry-run', action='store_true', + help='Do not actually clean, just print what would be done') + p.add_argument('--pool', default=None, + help=('oslo.messaging receiver pool name (see caveats ' + 'about this usage)')) + p.add_argument('--periodic', type=int, default=0, + help='Scan for missed required cleaning every N minutes') + p.add_argument('--nvme', action='store_true', + help=('Clean NVMe devices with `nvme format` on all ' + 'namespaces')) + p.add_argument('--generic-cmd', + help=('Run this command for each PCI device needing to be ' + 'cleaned. Arguments are "pci_addr rp_uuid"')) + args = p.parse_args() + cfg.CONF([], project='nova') + + auth = loading.load_auth_from_conf_options(cfg.CONF, 'placement') + sess = loading.load_session_from_conf_options(cfg.CONF, 'placement', + auth=auth) + placement = connection.Connection(session=sess, + oslo_conf=cfg.CONF).placement + + logging.basicConfig(level=args.debug and logging.DEBUG or logging.INFO) + + if not args.provider_name: + if cfg.CONF.host: + args.provider_name = cfg.CONF.host + else: + args.provider_name = socket.gethostname().split('.')[0] + if not args.topic: + args.topic = 'notifications-%s' % args.provider_name + + # This could be dynamic based on arguments to control what things we do + cleaners = [] + if args.nvme: + cleaners.append(NVMECleaner) + if args.generic_cmd: + cleaners.append(GenericCleaner.for_command(args.generic_cmd)) + if len(cleaners) > 1: + # NOTE(danms): This should be supportable, but it complicates our + # "when to unreserve" logic, so just keep this simple at the moment. + print('Only one type of cleaner may be enabled at a time') + return 1 + c = CleaningWatcher(placement, args.provider_name, cleaners, + dry_run=args.dry_run) + # Trigger at least once at startup before we potentially start processing + # notifications + c.trigger() + + if args.one: + return + + if args.periodic: + periodic = threading.Thread(target=periodic_thread, + args=(c, args.periodic * 60)) + periodic.daemon = True + periodic.start() + + transport = oslo_messaging.get_notification_transport(cfg.CONF) + targets = [oslo_messaging.Target(topic=args.topic, exchange='nova')] + endpoints = [c.endpoint] + if args.debug: + endpoints.insert(0, LoggerEndpoint()) + + server = oslo_messaging.get_notification_listener(transport, targets, + endpoints, + pool=args.pool) + server.start() + try: + server.wait() + except KeyboardInterrupt: + pass + + +if __name__ == '__main__': + main() diff --git a/doc/dictionary.txt b/doc/dictionary.txt index ed5e7cb30c48..bd2033ae51a1 100644 --- a/doc/dictionary.txt +++ b/doc/dictionary.txt @@ -14,3 +14,4 @@ childs assertin notin OTU +otu