Merge "typing: Fixups for typed osc-lib"

This commit is contained in:
Zuul
2025-12-16 18:13:51 +00:00
committed by Gerrit Code Review
19 changed files with 165 additions and 62 deletions

View File

@@ -15,15 +15,25 @@
"""Manage access to the clients, including authenticating when needed."""
import argparse
from collections.abc import Callable
import importlib
import logging
import sys
import typing as ty
from osc_lib.cli import client_config
from osc_lib import clientmanager
from osc_lib import shell
import stevedore
if ty.TYPE_CHECKING:
from keystoneauth1 import access as ksa_access
from openstack.compute.v2 import _proxy as compute_proxy
from openstack.image.v2 import _proxy as image_proxy
from openstack.network.v2 import _proxy as network_proxy
from openstackclient.api import object_store_v1
LOG = logging.getLogger(__name__)
@@ -40,6 +50,24 @@ class ClientManager(clientmanager.ClientManager):
in osc-lib so we need to maintain a transition period.
"""
if ty.TYPE_CHECKING:
# we know this will be set by us and will not be nullable
auth_ref: ksa_access.AccessInfo
# this is a hack to keep mypy happy: the actual attributes are set in
# get_plugin_modules below
# TODO(stephenfin): Change the types of identity and volume once we've
# migrated everything to SDK. Hopefully by then we'll have figured out
# how to statically distinguish between the v2 and v3 versions of both
# services...
# TODO(stephenfin): We also need to migrate object storage...
compute: compute_proxy.Proxy
identity: ty.Any
image: image_proxy.Proxy
network: network_proxy.Proxy
object_store: object_store_v1.APIv1
volume: ty.Any
def __init__(
self,
cli_options=None,
@@ -75,6 +103,12 @@ class ClientManager(clientmanager.ClientManager):
self._auth_required
and self._cli_options._openstack_config is not None
):
if not isinstance(
self._cli_options._openstack_config, client_config.OSC_Config
):
# programmer error
raise TypeError('unexpected type for _openstack_config')
self._cli_options._openstack_config._pw_callback = (
shell.prompt_for_password
)
@@ -101,6 +135,13 @@ class ClientManager(clientmanager.ClientManager):
self._cli_options.config['auth_type'] = self._original_auth_type
del self._cli_options.config['auth']['token']
del self._cli_options.config['auth']['endpoint']
if not isinstance(
self._cli_options._openstack_config, client_config.OSC_Config
):
# programmer error
raise TypeError('unexpected type for _openstack_config')
self._cli_options._auth = (
self._cli_options._openstack_config.load_auth_plugin(
self._cli_options.config,
@@ -138,11 +179,25 @@ class ClientManager(clientmanager.ClientManager):
# Plugin Support
ArgumentParserT = ty.TypeVar('ArgumentParserT', bound=argparse.ArgumentParser)
@ty.runtime_checkable # Optional: allows usage with isinstance()
class PluginModule(ty.Protocol):
DEFAULT_API_VERSION: str
API_VERSION_OPTION: str
API_NAME: str
API_VERSIONS: tuple[str]
make_client: Callable[..., ty.Any]
build_option_parser: Callable[[ArgumentParserT], ArgumentParserT]
check_api_version: Callable[[str], bool]
def _on_load_failure_callback(
manager: stevedore.ExtensionManager,
ep: importlib.metadata.EntryPoint,
err: Exception,
err: BaseException,
) -> None:
sys.stderr.write(
f"WARNING: Failed to import plugin {ep.group}:{ep.name}: {err}.\n"
@@ -152,6 +207,7 @@ def _on_load_failure_callback(
def get_plugin_modules(group):
"""Find plugin entry points"""
mod_list = []
mgr: stevedore.ExtensionManager[PluginModule]
mgr = stevedore.ExtensionManager(
group, on_load_failure_callback=_on_load_failure_callback
)
@@ -164,8 +220,8 @@ def get_plugin_modules(group):
module = importlib.import_module(module_name)
except Exception as err:
sys.stderr.write(
f"WARNING: Failed to import plugin {ep.group}:{ep.name}: "
f"{err}.\n"
f"WARNING: Failed to import plugin "
f"{ep.module_name}:{ep.name}: {err}.\n"
)
continue

View File

@@ -48,7 +48,9 @@ class ListCommand(command.Lister):
columns = ('Command Group', 'Commands')
if parsed_args.group:
groups = (group for group in groups if parsed_args.group in group)
groups = sorted(
group for group in groups if parsed_args.group in group
)
commands = []
for group in groups:

View File

@@ -19,6 +19,7 @@
import logging
import typing as ty
from cliff import columns
from openstack import utils as sdk_utils
from osc_lib.cli import format_columns
from osc_lib.cli import parseractions
@@ -32,7 +33,7 @@ from openstackclient.i18n import _
LOG = logging.getLogger(__name__)
_aggregate_formatters = {
_aggregate_formatters: dict[str, type[columns.FormattableColumn[ty.Any]]] = {
'Hosts': format_columns.ListColumn,
'Metadata': format_columns.DictColumn,
'hosts': format_columns.ListColumn,

View File

@@ -64,13 +64,15 @@ class ShowConsoleLog(command.Command):
output = compute_client.get_server_console_output(
server.id, length=parsed_args.lines
)
data = None
data: str | None = None
if output:
data = output.get('output', None)
if data and data[-1] != '\n':
data += '\n'
self.app.stdout.write(data)
if data:
self.app.stdout.write(data)
class ShowConsoleURL(command.ShowOne):

View File

@@ -16,7 +16,9 @@
"""Compute v2 Server Group action implementations"""
import logging
import typing as ty
from cliff import columns
from openstack import utils as sdk_utils
from osc_lib.cli import format_columns
from osc_lib.cli import parseractions
@@ -30,7 +32,7 @@ from openstackclient.i18n import _
LOG = logging.getLogger(__name__)
_formatters = {
_formatters: dict[str, type[columns.FormattableColumn[ty.Any]]] = {
'member_ids': format_columns.ListColumn,
'policies': format_columns.ListColumn,
'rules': format_columns.DictColumn,

View File

@@ -68,7 +68,7 @@ class ListRoleAssignment(command.Lister):
parsed_args.user,
)
elif parsed_args.authuser:
if auth_ref:
if auth_ref and auth_ref.user_id:
user = utils.find_resource(
identity_client.users, auth_ref.user_id
)

View File

@@ -44,7 +44,11 @@ class DeleteAccessRule(command.Command):
def take_action(self, parsed_args):
identity_client = self.app.client_manager.sdk_connection.identity
conn = self.app.client_manager.sdk_connection
user_id = conn.config.get_auth().get_user_id(conn.identity)
auth = conn.config.get_auth()
if auth is None:
# this will never happen
raise exceptions.CommandError('invalid authentication info')
user_id = auth.get_user_id(conn.identity)
errors = 0
for ac in parsed_args.access_rule:
@@ -87,7 +91,11 @@ class ListAccessRule(command.Lister):
).id
else:
conn = self.app.client_manager.sdk_connection
user_id = conn.config.get_auth().get_user_id(conn.identity)
auth = conn.config.get_auth()
if auth is None:
# this will never happen
raise exceptions.CommandError('invalid authentication info')
user_id = auth.get_user_id(conn.identity)
columns = ('ID', 'Service', 'Method', 'Path')
data = identity_client.access_rules(user=user_id)
@@ -119,7 +127,11 @@ class ShowAccessRule(command.ShowOne):
def take_action(self, parsed_args):
identity_client = self.app.client_manager.sdk_connection.identity
conn = self.app.client_manager.sdk_connection
user_id = conn.config.get_auth().get_user_id(conn.identity)
auth = conn.config.get_auth()
if auth is None:
# this will never happen
raise exceptions.CommandError('invalid authentication info')
user_id = auth.get_user_id(conn.identity)
access_rule = identity_client.get_access_rule(
user_id, parsed_args.access_rule

View File

@@ -206,7 +206,12 @@ class CreateApplicationCredential(command.ShowOne):
def take_action(self, parsed_args):
identity_client = self.app.client_manager.sdk_connection.identity
conn = self.app.client_manager.sdk_connection
user_id = conn.config.get_auth().get_user_id(conn.identity)
auth = conn.config.get_auth()
if auth is None:
# this will never happen
raise exceptions.CommandError('invalid authentication info')
user_id = auth.get_user_id(conn.identity)
role_ids = []
for role in parsed_args.roles:
@@ -274,7 +279,12 @@ class DeleteApplicationCredential(command.Command):
def take_action(self, parsed_args):
identity_client = self.app.client_manager.sdk_connection.identity
conn = self.app.client_manager.sdk_connection
user_id = conn.config.get_auth().get_user_id(conn.identity)
auth = conn.config.get_auth()
if auth is None:
# this will never happen
raise exceptions.CommandError('invalid authentication info')
user_id = auth.get_user_id(conn.identity)
errors = 0
for ac in parsed_args.application_credential:
@@ -327,7 +337,11 @@ class ListApplicationCredential(command.Lister):
)
else:
conn = self.app.client_manager.sdk_connection
user_id = conn.config.get_auth().get_user_id(conn.identity)
auth = conn.config.get_auth()
if auth is None:
# this will never happen
raise exceptions.CommandError('invalid authentication info')
user_id = auth.get_user_id(conn.identity)
application_credentials = identity_client.application_credentials(
user=user_id
@@ -351,7 +365,11 @@ class ShowApplicationCredential(command.ShowOne):
def take_action(self, parsed_args):
identity_client = self.app.client_manager.sdk_connection.identity
conn = self.app.client_manager.sdk_connection
user_id = conn.config.get_auth().get_user_id(conn.identity)
auth = conn.config.get_auth()
if auth is None:
# this will never happen
raise exceptions.CommandError('invalid authentication info')
user_id = auth.get_user_id(conn.identity)
application_credential = identity_client.find_application_credential(
user_id, parsed_args.application_credential, ignore_missing=False

View File

@@ -137,8 +137,9 @@ class CreateIdentityProvider(command.ShowOne):
)
idp._info.pop('links', None)
remote_ids = format_columns.ListColumn(idp._info.pop('remote_ids', []))
idp._info['remote_ids'] = remote_ids
idp._info['remote_ids'] = format_columns.ListColumn(
idp._info.pop('remote_ids', [])
)
return zip(*sorted(idp._info.items()))

View File

@@ -693,7 +693,12 @@ class SetPasswordUser(command.Command):
def take_action(self, parsed_args):
identity_client = self.app.client_manager.sdk_connection.identity
conn = self.app.client_manager.sdk_connection
user_id = conn.config.get_auth().get_user_id(conn.identity)
auth = conn.config.get_auth()
if auth is None:
# this will never happen
raise exceptions.CommandError('invalid authentication info')
user_id = auth.get_user_id(conn.identity)
# FIXME(gyee): there are two scenarios:
#

View File

@@ -19,6 +19,7 @@ import argparse
import logging
import os
import sys
import typing as ty
from cliff import columns as cliff_columns
from osc_lib.api import utils as api_utils
@@ -67,9 +68,6 @@ def _get_columns(item):
)
_formatters = {}
class HumanReadableSizeColumn(cliff_columns.FormattableColumn[int]):
def human_readable(self):
"""Return a formatted visibility string
@@ -340,9 +338,12 @@ class CreateImage(command.ShowOne):
if image:
display_columns, columns = _get_columns(image)
_formatters['properties'] = format_columns.DictColumn
data = utils.get_item_properties(
image, columns, formatters=_formatters
image,
columns,
formatters={
'properties': format_columns.DictColumn,
},
)
return (display_columns, data)
elif info:
@@ -493,19 +494,19 @@ class ListImage(command.Lister):
column_headers = columns
# List of image data received
data = list(image_client.images(**kwargs))
images = list(image_client.images(**kwargs))
if parsed_args.property:
# NOTE(dtroyer): coerce to a list to subscript it in py3
attr, value = list(parsed_args.property.items())[0]
api_utils.simple_filter(
data,
images,
attr=attr,
value=value,
property_field='properties',
)
data = utils.sort_items(data, parsed_args.sort)
data = utils.sort_items(images, parsed_args.sort)
return (
column_headers,
@@ -839,11 +840,13 @@ class ShowImage(command.ShowOne):
parsed_args.image, ignore_missing=False
)
formatters: dict[
str, type[cliff_columns.FormattableColumn[ty.Any]]
] = {
'properties': format_columns.DictColumn,
}
if parsed_args.human_readable:
_formatters['size'] = HumanReadableSizeColumn
formatters['size'] = HumanReadableSizeColumn
display_columns, columns = _get_columns(image)
_formatters['properties'] = format_columns.DictColumn
data = utils.get_item_properties(
image, columns, formatters=_formatters
)
data = utils.get_item_properties(image, columns, formatters=formatters)
return (display_columns, data)

View File

@@ -551,7 +551,7 @@ class CreateImage(command.ShowOne):
sign_cert_id = parsed_args.sign_cert_id
signer = image_signer.ImageSigner()
try:
pw = utils.get_password(
pw: str | None = utils.get_password(
self.app.stdin,
prompt=(
"Please enter private key password, leave "
@@ -562,12 +562,11 @@ class CreateImage(command.ShowOne):
if not pw or len(pw) < 1:
pw = None
else:
# load_private_key() requires the password to be
# passed as bytes
pw = pw.encode()
signer.load_private_key(sign_key_path, password=pw)
signer.load_private_key(
sign_key_path,
password=pw.encode() if pw else None,
)
except Exception:
msg = _(
"Error during sign operation: private key "
@@ -933,18 +932,19 @@ class ListImage(command.Lister):
if 'limit' in kwargs:
# Disable automatic pagination in SDK
kwargs['paginated'] = False
data = list(image_client.images(**kwargs))
images = list(image_client.images(**kwargs))
if parsed_args.property:
for attr, value in parsed_args.property.items():
api_utils.simple_filter(
data,
images,
attr=attr,
value=value,
property_field='properties',
)
data = utils.sort_items(data, parsed_args.sort, str)
data = utils.sort_items(images, parsed_args.sort, str)
return (
column_headers,

View File

@@ -24,7 +24,6 @@ from osc_lib import exceptions
from openstackclient import command
from openstackclient.i18n import _
from openstackclient.network import utils
from openstackclient import shell
LOG = logging.getLogger(__name__)
@@ -68,8 +67,6 @@ class NetDetectionMixin(metaclass=abc.ABCMeta):
present the options for both network types, often qualified accordingly.
"""
app: shell.OpenStackShell
@property
def _network_type(self):
"""Discover whether the running cloud is using neutron or nova-network.
@@ -84,7 +81,7 @@ class NetDetectionMixin(metaclass=abc.ABCMeta):
# Have we set it up yet for this command?
if not hasattr(self, '_net_type'):
try:
if self.app.client_manager.is_network_endpoint_enabled():
if self.app.client_manager.is_network_endpoint_enabled(): # type: ignore
net_type = _NET_TYPE_NEUTRON
else:
net_type = _NET_TYPE_COMPUTE
@@ -163,11 +160,13 @@ class NetDetectionMixin(metaclass=abc.ABCMeta):
def take_action(self, parsed_args):
if self.is_neutron:
return self.take_action_network(
self.app.client_manager.network, parsed_args
self.app.client_manager.network, # type: ignore
parsed_args,
)
elif self.is_nova_network:
return self.take_action_compute(
self.app.client_manager.compute, parsed_args
self.app.client_manager.compute, # type: ignore
parsed_args,
)
def take_action_network(self, client, parsed_args):

View File

@@ -25,13 +25,12 @@ from openstackclient.i18n import _
from openstackclient.identity import common
from openstackclient.network.v2.taas import tap_service
LOG = logging.getLogger(__name__)
TAP_FLOW = 'tap_flow'
TAP_FLOWS = f'{TAP_FLOW}s'
_attr_map = (
_attr_map = [
('id', 'ID', column_util.LIST_BOTH),
('tenant_id', 'Tenant', column_util.LIST_LONG_ONLY),
('name', 'Name', column_util.LIST_BOTH),
@@ -39,7 +38,7 @@ _attr_map = (
('source_port', 'source_port', column_util.LIST_BOTH),
('tap_service_id', 'tap_service_id', column_util.LIST_BOTH),
('direction', 'Direction', column_util.LIST_BOTH),
)
]
_formatters = {
'vlan_filter': format_columns.ListColumn,

View File

@@ -23,13 +23,12 @@ from openstackclient.identity import common
from openstackclient.network.v2 import port as osc_port
from openstackclient.network.v2.taas import tap_service
LOG = logging.getLogger(__name__)
TAP_MIRROR = 'tap_mirror'
TAP_MIRRORS = f'{TAP_MIRROR}s'
_attr_map = (
_attr_map = [
('id', 'ID', column_util.LIST_BOTH),
('tenant_id', 'Tenant', column_util.LIST_LONG_ONLY),
('name', 'Name', column_util.LIST_BOTH),
@@ -37,7 +36,7 @@ _attr_map = (
('directions', 'Directions', column_util.LIST_LONG_ONLY),
('remote_ip', 'Remote IP', column_util.LIST_BOTH),
('mirror_type', 'Mirror Type', column_util.LIST_LONG_ONLY),
)
]
def _get_columns(item):

View File

@@ -23,19 +23,18 @@ from openstackclient import command
from openstackclient.i18n import _
from openstackclient.identity import common
LOG = logging.getLogger(__name__)
TAP_SERVICE = 'tap_service'
TAP_SERVICES = f'{TAP_SERVICE}s'
_attr_map = (
_attr_map = [
('id', 'ID', column_util.LIST_BOTH),
('tenant_id', 'Tenant', column_util.LIST_LONG_ONLY),
('name', 'Name', column_util.LIST_BOTH),
('port_id', 'Port', column_util.LIST_BOTH),
('status', 'Status', column_util.LIST_BOTH),
)
]
def _add_updatable_args(parser):

View File

@@ -36,6 +36,8 @@ IGNORED_MODULES = (
class OpenStackShell(shell.OpenStackShell):
client_manager: clientmanager.ClientManager
def __init__(self):
command_manager = commandmanager.CommandManager(
'openstack.cli', ignored_modules=IGNORED_MODULES
@@ -57,8 +59,10 @@ class OpenStackShell(shell.OpenStackShell):
# about them
warnings.filterwarnings('ignore', module='openstack')
def build_option_parser(self, description, version):
parser = super().build_option_parser(description, version)
def build_option_parser(self, description, version, argparse_kwargs=None):
parser = super().build_option_parser(
description, version, argparse_kwargs
)
parser = clientmanager.build_plugin_option_parser(parser)
parser = auth.build_auth_plugins_option_parser(parser)
return parser

View File

@@ -61,7 +61,7 @@ class KeyValueHintAction(argparse.Action):
)
class AttachmentsColumn(cliff_columns.FormattableColumn[list[str]]):
class AttachmentsColumn(cliff_columns.FormattableColumn[list[ty.Any]]):
"""Formattable column for attachments column.
Unlike the parent FormattableColumn class, the initializer of the

View File

@@ -11,6 +11,7 @@
# under the License.
import logging
import typing as ty
from openstack import utils as sdk_utils
from osc_lib.cli import format_columns
@@ -56,12 +57,12 @@ def _format_attachment(attachment):
# VolumeAttachmentManager.create returns a dict while everything else
# returns a VolumeAttachment object
if isinstance(attachment, dict):
data = []
data: tuple[ty.Any, ...] = ()
for column in columns:
if column == 'connection_info':
data.append(format_columns.DictColumn(attachment[column]))
data += (format_columns.DictColumn(attachment[column]),)
continue
data.append(attachment[column])
data += (attachment[column],)
else:
data = utils.get_item_properties(
attachment,