Files
nova-contrib/clean-on-delete.py
Amit Uniyal 7650481568 Fixes pep8 H904 error
Change-Id: I2eb20d81f45dec8b2b569823c645f906acdb8f79
Signed-off-by: Amit Uniyal <auniyal@redhat.com>
2025-08-05 10:12:47 +05:30

490 lines
19 KiB
Python
Executable File

#!/usr/bin/env python3
# Copyright 2025 Red Hat
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import collections
import json
import logging
import socket
import subprocess
import threading
import time
import urllib
from keystoneauth1 import exceptions
from keystoneauth1 import loading
from oslo_concurrency import lockutils
from oslo_config import cfg
import oslo_messaging
from oslo_serialization import jsonutils
LOG = logging.getLogger('cleaner')
loading.register_session_conf_options(cfg.CONF, 'placement') # noqa
loading.register_auth_conf_options(cfg.CONF, 'placement') # noqa
loading.register_adapter_conf_options(cfg.CONF, 'placement') # noqa
cfg.CONF.register_opts([cfg.StrOpt('host')]) # noqa
NVMeDevice = collections.namedtuple('NVMeDevice',
['controller', 'namespaces', 'ctrl_data'])
RPInventory = collections.namedtuple('RPInventory', [
'base_path', 'resource_provider_id', 'resource_provider_generation',
'resource_class', 'total', 'max_unit'])
# Command to sanitize via crypto
NVME_SANITIZE_CMD_CRYPT = 0x04
# Feature flag indicating crypto sanitize support
NVME_SANICAP_CRYPT = 0x01
# Feature flag inidicating block-erase sanitize support
NVME_SANICAP_BLKERASE = 0x02
# Feature flag indicating overwrite sanitize support
NVME_SANICAP_OVERWRITE = 0x04
# Feature flag indicating crypto format support
NVME_FORMAT_CRYPT = 0x04
class CleaningFailed(Exception):
pass
def periodic_thread(cleaner, delay):
while True:
time.sleep(delay)
LOG.info('Running periodic clean')
try:
cleaner.trigger()
except Exception as e:
LOG.exception('Failed to run periodic cleaning: %s', e)
class CleaningEndpoint:
"""A notification listener endpoint that triggers cleaner on delete."""
filter_rule = oslo_messaging.NotificationFilter(
event_type=r'instance\.delete\.end$')
def __init__(self, cleaner, publisher_filter):
self.cleaner = cleaner
self.publisher_filter = publisher_filter
def info(self, ctxt, publisher_id, event_type, payload, metadata):
if publisher_id == self.publisher_filter:
LOG.info('Instance deleted on %s', publisher_id)
self.cleaner.trigger()
else:
LOG.debug('Ignoring notification from %s != %s',
publisher_id, self.publisher_filter)
audit = info
debug = info
warn = info
error = info
critical = info
sample = info
class Cleaner:
"""Base class for a cleaner for a given device type"""
def match(self, pci_addr):
"""Determine if a PCI device matches this type"""
return False
def clean(self, rp, pci_addr):
"""Clean a specific device"""
raise NotImplementedError()
class GenericCleaner(Cleaner):
def match(self, pci_addr):
return True
def clean(self, rp, pci_addr):
cmd = [self.CMD, pci_addr, rp['uuid']]
LOG.debug('Running %r', ' '.join(cmd))
subprocess.check_call(cmd)
@classmethod
def for_command(cls, cmd):
class CommandCleaner(cls):
CMD = cmd
return CommandCleaner
class NVMECleaner(Cleaner):
@staticmethod
def rungetjson(cmd):
try:
output = subprocess.check_output(cmd, shell=True)
return jsonutils.loads(output)
except subprocess.CalledProcessError as e:
LOG.error('Failed to run %r: %s', cmd, e)
raise CleaningFailed('Failed to run nvme utility')
except json.JSONDecodeError as e:
LOG.debug('Raw output from %r: %r', cmd, output)
LOG.error('Failed to parse nvme utility output from %r: %s',
cmd, e)
raise CleaningFailed('Failed to parse nvme utility output')
def __init__(self):
data = self.rungetjson('nvme list -v -o json')
self._nvmes = {}
for device in data['Devices']:
subs = device.get('Subsystems', [device])
for sub in subs:
# v2 output has namespaces at the subsystem level
subsys_ns = [ns['NameSpace']
for ns in sub.get('Namespaces', [])]
for controller in sub['Controllers']:
ctrl_data = self.rungetjson(
'nvme id-ctrl -o json /dev/%s' % (
controller['Controller']))
ns = [ns['NameSpace'] for ns in controller['Namespaces']]
self._nvmes[controller['Address'].upper()] = NVMeDevice(
controller['Controller'], subsys_ns + ns, ctrl_data)
LOG.debug('Found NVMe devices: %s', self._nvmes)
def match(self, pci_addr):
return pci_addr.upper() in self._nvmes
def _clean_with_format(self, nvme):
nn = nvme.ctrl_data.get('nn', 0)
if nn != 1:
# If the device supports more than one namespace, there
# may be some data residue outside the current set of
# configured namespaces that we will not clean
LOG.warning('NVMe device %s reports %i namespaces '
'supported; cleaning may not be complete!',
nvme.controller, nn)
if not nvme.namespaces:
LOG.error('Device %s does not report any namespaces; '
'will not clean!')
raise CleaningFailed('Device has no namespaces')
cleaned = 0
for nsdev in nvme.namespaces:
cmd = 'nvme format -f -s1 /dev/%s' % nsdev
LOG.debug('Running %r to clean %s', cmd, nsdev)
subprocess.check_call(cmd, shell=True)
cleaned += 1
LOG.debug('Format complete for %s', ','.join(nvme.namespaces))
# This is a bit silly since we checked nvme.namespaces above, but
# be positive about what cleaning success means so that if this code
# changes in the future we won't end up with a falls-out-the-bottom
# "success".
# NOTE(danms): There is no guarantee that the device is fully covered
# by active namespaces and this could result in data being leaked
# if, for example, a user deleted some namespaces before releasing
# the device without cleaning it themselves first. Sanitize is the
# much more robust mechanism for this type of multi-tenant scenario!
return cleaned > 0 and cleaned == len(nvme.namespaces)
def _clean_with_sanitize(self, nvme):
# NOTE(danms): Theoretically we could support non-crypto sanitize
# here, but it seems like everything that supports sanitize is also
# likely to support the crypto mechanism which is better anyway.
cmd = 'nvme sanitize -a 0x%02x /dev/%s' % (NVME_FORMAT_CRYPT,
nvme.controller)
subprocess.check_call(cmd, shell=True)
for i in range(10):
time.sleep(i + 1)
data = self.rungetjson(
'nvme sanitize-log -o json /dev/%s' % nvme.controller)
if data[nvme.controller]['sstat'].get('global_erased') == 1:
LOG.info('Sanitization complete for %s', nvme.controller)
return True
LOG.debug('Waiting for sanitize to complete on %s',
nvme.controller)
raise CleaningFailed(
'Timed out waiting for sanitize operation on %s' % (
nvme.controller))
def clean(self, rp, pci_addr):
nvme = self._nvmes[pci_addr.upper()]
sanicap = nvme.ctrl_data.get('sanicap', 0)
if sanicap & NVME_SANICAP_CRYPT:
LOG.info('Cleaning %s with crypto sanitize', nvme.controller)
result = self._clean_with_sanitize(nvme)
else:
LOG.info('Cleaning %s with format', nvme.controller)
result = self._clean_with_format(nvme)
if result is not True:
raise CleaningFailed('Cleaning did not positively signal success')
class CleaningWatcher():
def __init__(self, session, rp_name, cleaners, dry_run=False):
self.session = session
self.endpoint = CleaningEndpoint(self, 'nova-compute:%s' % rp_name)
self.cleaners = cleaners
self.dryrun = dry_run
self.interface = cfg.CONF.placement.valid_interfaces[0]
ep = session.get_endpoint(
session.auth,
service_type='placement',
interface=self.interface)
self.baseurl = urllib.parse.urlparse(ep).path
LOG.debug('Placement endpoint baseurl is %s', self.baseurl)
rps = self._getjson('/resource_providers',
params={'name': rp_name})['resource_providers']
if len(rps) != 1:
raise RuntimeError('Failed to find a resource provider for %r' % (
rp_name))
self.rp_id = rps[0]['uuid']
LOG.debug('Found uuid %s for resource provider %r',
self.rp_id, rp_name)
def link_for_rp(self, rp, linktype):
"""Return the URL for a named link relation"""
for link in rp['links']:
if link['rel'] == linktype:
# This link will have the full path as used to access it,
# but keystoneauth will re-add the baseurl (i.e. /placement
# to it)
return link['href'].removeprefix(self.baseurl)
def _getjson(self, url, **kwargs):
return self.session.get(
url, microversion='1.14',
endpoint_filter={'service_type': 'placement',
'interface': self.interface},
**kwargs).json()
def _putjson(self, url, data):
return self.session.put(
url, json=data, microversion='1.14',
endpoint_filter={'service_type': 'placement',
'interface': self.interface})
def traits_for_rp(self, rp):
# NOTE(danms): openstacksdk has no support for resource provider
# traits, so query directly here.
url = self.link_for_rp(rp, 'traits')
return self._getjson(url)['traits']
def allocations_for_rp(self, rp):
# NOTE(danms): openstacksdk has no support for resource provider
# allocations, so query directly here.
url = self.link_for_rp(rp, 'allocations')
return self._getjson(url)['allocations']
def inventories_for_rp(self, rp):
url = self.link_for_rp(rp, 'inventories')
return self._getjson(url)
def rp_inventory_to_clean(self, rp):
rpinv = self.inventories_for_rp(rp)
inventory = rpinv['inventories']
generation = rpinv['resource_provider_generation']
if len(inventory) != 1:
# This has OTU traitage, but doesn't look like a PCI-in-placement
# provider, so skip it for safety.
LOG.warning(
'Skipping provider %s because it has %i inventories',
rp['uuid'], len(inventory))
return
# OTU devices are "burned" when they have reserved == total
rc = list(inventory.keys())[0]
is_reserved = inventory[rc]['reserved'] == inventory[rc]['total']
allocs = self.allocations_for_rp(rp)
LOG.debug('Allocations for %s: %s', rp['uuid'], allocs)
LOG.debug('Inventory for %s: %s', rp['uuid'], inventory)
# A provider needs cleaning if it is reserved and has no allocations
if is_reserved and not allocs:
rpinv = RPInventory(self.link_for_rp(rp, 'inventories'),
rp['uuid'],
generation, rc,
inventory[rc]['total'],
inventory[rc]['max_unit'])
return rpinv
def pci_addr_for_rp(self, rp):
# This format is defined in the PCI-in-placement work as:
# hostname_DDDD:BB:SS.FF
host, addr = rp['name'].split('_')
return addr
def unreserve(self, inventory):
LOG.info('Unreserving %s on %s', inventory.resource_class,
inventory.resource_provider_id)
# NOTE(danms): openstacksdk apparently can't change *just* reserved
# for a provider inventory. Placement requires total be in the payload,
# which the sdk will not include if it thinks it is unchanged. So,
# manually PUT the inventory here.
url = '/'.join([
inventory.base_path % {
'resource_provider_id': inventory.resource_provider_id},
inventory.resource_class])
inventory = {
'resource_provider_generation': (
inventory.resource_provider_generation),
'reserved': 0,
'total': inventory.total,
'max_unit': inventory.max_unit,
}
r = self._putjson(url, inventory)
r.raise_for_status()
def run_cleaner(self, rp, pci_addr, cleaner, inventory):
if self.dryrun:
LOG.info('Would use %s to clean %s %s',
cleaner.__class__.__name__,
inventory.resource_provider_id,
inventory.resource_class)
return
try:
cleaner.clean(rp, pci_addr)
self.unreserve(inventory)
except subprocess.CalledProcessError as e:
LOG.error('Failed to clean %s (%s) - action returned %i',
pci_addr, rp['uuid'], e.returncode)
except exceptions.HttpError as e:
LOG.error('Failed to unreserve %s: %s', rp['uuid'], e)
except Exception as e:
LOG.exception('Unexpected error cleaning %s (%s): %s',
pci_addr, rp['uuid'], e)
@lockutils.synchronized('cleaner')
def trigger(self):
children = self._getjson(
'/resource_providers',
params={'in_tree': self.rp_id})['resource_providers']
cleaners = [cleaner_cls() for cleaner_cls in self.cleaners]
for child in children:
is_otu = 'HW_PCI_ONE_TIME_USE' in self.traits_for_rp(child)
if not is_otu:
continue
inventory_to_clean = self.rp_inventory_to_clean(child)
needs_cleaning = is_otu and inventory_to_clean
LOG.debug('Provider %s needs cleaning: %s (otu=%s)',
child['uuid'], needs_cleaning, is_otu)
if not needs_cleaning:
continue
pci_addr = self.pci_addr_for_rp(child)
matched_cleaners = [cleaner for cleaner in cleaners
if cleaner.match(pci_addr)]
for cleaner in matched_cleaners:
self.run_cleaner(child, pci_addr, cleaner, inventory_to_clean)
if not matched_cleaners and needs_cleaning:
# Unsupported device type, don't clean
LOG.warning('Child provider %s for %s needs cleaning but no '
'cleaning implementations matched!',
child['uuid'], pci_addr)
class LoggerEndpoint:
"""A notification endpoint that just logs each event received for debug"""
def info(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.info('Event: %s %s', publisher_id, event_type)
def main():
p = argparse.ArgumentParser()
p.add_argument('--topic', default=None,
help=('Notification queue. Defaults to '
'notifications-$provider_name for recommended '
'per-compute topic configuration'))
p.add_argument('--debug', action='store_true',
help='Enable verbose debug logging')
p.add_argument('--provider-name', default=None,
help=('Parent provider name for this compute node '
'(if unspecified, attempt to detect)'))
p.add_argument('--one', action='store_true',
help='Run one cleaning pass and exit')
p.add_argument('--dry-run', action='store_true',
help='Do not actually clean, just print what would be done')
p.add_argument('--pool', default=None,
help=('oslo.messaging receiver pool name (see caveats '
'about this usage)'))
p.add_argument('--periodic', type=int, default=0,
help='Scan for missed required cleaning every N minutes')
p.add_argument('--nvme', action='store_true',
help=('Clean NVMe devices with `nvme format` on all '
'namespaces'))
p.add_argument('--generic-cmd',
help=('Run this command for each PCI device needing to be '
'cleaned. Arguments are "pci_addr rp_uuid"'))
args = p.parse_args()
cfg.CONF([], project='nova')
auth = loading.load_auth_from_conf_options(cfg.CONF, 'placement')
sess = loading.load_session_from_conf_options(cfg.CONF, 'placement',
auth=auth)
logging.basicConfig(level=args.debug and logging.DEBUG or logging.INFO)
if not args.provider_name:
if cfg.CONF.host:
args.provider_name = cfg.CONF.host
else:
args.provider_name = socket.gethostname().split('.')[0]
if not args.topic:
args.topic = 'notifications-%s' % args.provider_name
# This could be dynamic based on arguments to control what things we do
cleaners = []
if args.nvme:
cleaners.append(NVMECleaner)
if args.generic_cmd:
cleaners.append(GenericCleaner.for_command(args.generic_cmd))
if len(cleaners) > 1:
# NOTE(danms): This should be supportable, but it complicates our
# "when to unreserve" logic, so just keep this simple at the moment.
print('Only one type of cleaner may be enabled at a time')
return 1
c = CleaningWatcher(sess, args.provider_name, cleaners,
dry_run=args.dry_run)
# Trigger at least once at startup before we potentially start processing
# notifications
c.trigger()
if args.one:
return
if args.periodic:
periodic = threading.Thread(target=periodic_thread,
args=(c, args.periodic * 60))
periodic.daemon = True
periodic.start()
transport = oslo_messaging.get_notification_transport(cfg.CONF)
targets = [oslo_messaging.Target(topic=args.topic, exchange='nova')]
endpoints = [c.endpoint]
if args.debug:
endpoints.insert(0, LoggerEndpoint())
server = oslo_messaging.get_notification_listener(transport, targets,
endpoints,
pool=args.pool)
server.start()
try:
server.wait()
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()