# Copyright 2012-2013 OpenStack Foundation # Copyright 2015 Dean Troyer # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # """Command-line interface to the OpenStack APIs""" import getpass import logging import sys import traceback import typing as ty from cliff import _argparse from cliff import app from cliff import command from cliff import commandmanager from cliff import complete from cliff import help from cliff import interactive from oslo_utils import importutils from oslo_utils import strutils from osc_lib.cli import client_config as cloud_config from osc_lib import clientmanager from osc_lib.command import timing from osc_lib import exceptions as exc from osc_lib.i18n import _ from osc_lib import logs from osc_lib import utils from osc_lib import version osprofiler_profiler = importutils.try_import("osprofiler.profiler") DEFAULT_DOMAIN = 'default' DEFAULT_INTERFACE = 'public' def prompt_for_password(prompt: str | None = None) -> str: """Prompt user for a password Prompt for a password if stdin is a tty. """ if not prompt: prompt = 'Password: ' pw = None # If stdin is a tty, try prompting for the password if hasattr(sys.stdin, 'isatty') and sys.stdin.isatty(): # Check for Ctl-D try: pw = getpass.getpass(prompt) except EOFError: pass # No password because we did't have a tty or nothing was entered if not pw: raise exc.CommandError( _( "No password entered, or found via" " --os-password or OS_PASSWORD" ), ) return pw class OpenStackShell(app.App): CONSOLE_MESSAGE_FORMAT = '%(levelname)s: %(name)s %(message)s' client_manager: clientmanager.ClientManager log = logging.getLogger(__name__) timing_data: list[ty.Any] = [] def __init__( self, description: str | None = None, version: str | None = None, command_manager: commandmanager.CommandManager | None = None, stdin: ty.TextIO | None = None, stdout: ty.TextIO | None = None, stderr: ty.TextIO | None = None, interactive_app_factory: type['interactive.InteractiveApp'] | None = None, deferred_help: bool = False, ) -> None: # Patch command.Command to add a default auth_required = True setattr(command.Command, 'auth_required', True) # Some commands do not need authentication setattr(help.HelpCommand, 'auth_required', False) setattr(complete.CompleteCommand, 'auth_required', False) # Slight change to the meaning of --debug self.DEFAULT_DEBUG_VALUE = None self.DEFAULT_DEBUG_HELP = 'Set debug logging and traceback on errors.' # Do default for positionals if not command_manager: cm = commandmanager.CommandManager('openstack.cli') else: cm = command_manager super().__init__( description=__doc__.strip(), version=version, command_manager=cm, deferred_help=True, ) # Until we have command line arguments parsed, dump any stack traces self.dump_stack_trace = True # Set in subclasses self.api_version = None self.command_options: list[str] = [] self.do_profile = False def configure_logging(self) -> None: """Configure logging for the app.""" self.log_configurator = logs.LogConfigurator(self.options) self.dump_stack_trace = self.log_configurator.dump_trace def run(self, argv: list[str]) -> int: ret_val = 1 self.command_options = argv try: ret_val = super().run(argv) return ret_val except Exception as e: if not logging.getLogger('').handlers: logging.basicConfig() if self.dump_stack_trace: self.log.error(traceback.format_exc()) else: self.log.error('Exception raised: ' + str(e)) return ret_val finally: self.log.debug("END return value: %s", ret_val) def init_profile(self) -> None: self.do_profile = osprofiler_profiler and self.options.profile if self.do_profile: osprofiler_profiler.init(self.options.profile) def close_profile(self) -> None: if self.do_profile: profiler = osprofiler_profiler.get() trace_id = profiler.get_base_id() # Short ID for OpenTracing-based driver (64-bit id) short_id = profiler.get_shorten_id(trace_id) # NOTE(dbelova): let's use warning log level to see these messages # printed. In fact we can define custom log level here with value # bigger than most big default one (CRITICAL) or something like # that (PROFILE = 60 for instance), but not sure we need it here. self.log.warning(f"Trace ID: {trace_id}") self.log.warning( f"Short trace ID for OpenTracing-based drivers: {short_id}" ) self.log.warning( "Display trace data with command:\n" f"osprofiler trace show --html {trace_id} " ) def run_subcommand(self, argv: list[str]) -> int: self.init_profile() try: ret_value = super().run_subcommand(argv) finally: self.close_profile() return ret_value def interact(self) -> None: self.init_profile() try: ret_value = super().interact() finally: self.close_profile() return ret_value def build_option_parser( self, description: str | None, version: str | None, argparse_kwargs: dict[str, ty.Any] | None = None, ) -> _argparse.ArgumentParser: parser = super().build_option_parser( description, version, argparse_kwargs, ) # service token auth argument parser.add_argument( '--os-cloud', metavar='', dest='cloud', default=utils.env('OS_CLOUD'), help=_('Cloud name in clouds.yaml (Env: OS_CLOUD)'), ) # Global arguments parser.add_argument( '--os-region-name', metavar='', dest='region_name', default=utils.env('OS_REGION_NAME'), help=_('Authentication region name (Env: OS_REGION_NAME)'), ) parser.add_argument( '--os-cacert', metavar='', dest='cacert', default=utils.env('OS_CACERT', default=None), help=_('CA certificate bundle file (Env: OS_CACERT)'), ) parser.add_argument( '--os-cert', metavar='', dest='cert', default=utils.env('OS_CERT'), help=_('Client certificate bundle file (Env: OS_CERT)'), ) parser.add_argument( '--os-key', metavar='', dest='key', default=utils.env('OS_KEY'), help=_('Client certificate key file (Env: OS_KEY)'), ) verify_group = parser.add_mutually_exclusive_group() verify_group.add_argument( '--verify', action='store_true', default=None, help=_('Verify server certificate (default)'), ) verify_group.add_argument( '--insecure', action='store_true', default=None, help=_('Disable server certificate verification'), ) parser.add_argument( '--os-default-domain', metavar='', dest='default_domain', default=utils.env('OS_DEFAULT_DOMAIN', default=DEFAULT_DOMAIN), help=_('Default domain ID, default=%s. (Env: OS_DEFAULT_DOMAIN)') % DEFAULT_DOMAIN, ) parser.add_argument( '--os-interface', metavar='', dest='interface', choices=['admin', 'public', 'internal'], # do not set default value inside utils.env() else user's cloud # config key 'interface' will be ignored. Use OSC_Config's ctor # option 'override_defaults' below instead. default=utils.env('OS_INTERFACE'), help=_( 'Select an interface type.' ' Valid interface types: [admin, public, internal].' ' default=%s, (Env: OS_INTERFACE)' ) % DEFAULT_INTERFACE, ) parser.add_argument( '--os-service-provider', metavar='', dest='service_provider', default=utils.env('OS_SERVICE_PROVIDER'), help=_( 'Authenticate with and perform the command on a service' ' provider using Keystone-to-keystone federation. Must' ' also specify the remote project option.' ), ) remote_project_group = parser.add_mutually_exclusive_group() remote_project_group.add_argument( '--os-remote-project-name', metavar='', dest='remote_project_name', default=utils.env('OS_REMOTE_PROJECT_NAME'), help=_( 'Project name when authenticating to a service provider' ' if using Keystone-to-Keystone federation.' ), ) remote_project_group.add_argument( '--os-remote-project-id', metavar='', dest='remote_project_id', default=utils.env('OS_REMOTE_PROJECT_ID'), help=_( 'Project ID when authenticating to a service provider' ' if using Keystone-to-Keystone federation.' ), ) remote_project_domain_group = parser.add_mutually_exclusive_group() remote_project_domain_group.add_argument( '--os-remote-project-domain-name', metavar='', dest='remote_project_domain_name', default=utils.env('OS_REMOTE_PROJECT_DOMAIN_NAME'), help=_( 'Domain name of the project when authenticating to a' ' service provider if using Keystone-to-Keystone' ' federation.' ), ) remote_project_domain_group.add_argument( '--os-remote-project-domain-id', metavar='', dest='remote_project_domain_id', default=utils.env('OS_REMOTE_PROJECT_DOMAIN_ID'), help=_( 'Domain ID of the project when authenticating to a' ' service provider if using Keystone-to-Keystone' ' federation.' ), ) parser.add_argument( '--timing', default=False, action='store_true', help=_("Print API call timing info"), ) parser.add_argument( '--os-beta-command', action='store_true', help=_("Enable beta commands which are subject to change"), ) # osprofiler HMAC key argument if osprofiler_profiler: parser.add_argument( '--os-profile', metavar='hmac-key', dest='profile', default=utils.env('OS_PROFILE'), help=_('HMAC key for encrypting profiling context data'), ) return parser """ Break up initialize_app() so that overriding it in a subclass does not require duplicating a lot of the method * super() * _final_defaults() * OpenStackConfig * get_one * _load_plugins() * _load_commands() * ClientManager """ def _final_defaults(self) -> None: # Set the default plugin to None # NOTE(dtroyer): This is here to set up for setting it to a default # in the calling CLI self._auth_type = None # Converge project/tenant options project_id = getattr(self.options, 'project_id', None) project_name = getattr(self.options, 'project_name', None) tenant_id = getattr(self.options, 'tenant_id', None) tenant_name = getattr(self.options, 'tenant_name', None) # handle some v2/v3 authentication inconsistencies by just acting like # both the project and tenant information are both present. This can # go away if we stop registering all the argparse options together. if project_id and not tenant_id: self.options.tenant_id = project_id if project_name and not tenant_name: self.options.tenant_name = project_name if tenant_id and not project_id: self.options.project_id = tenant_id if tenant_name and not project_name: self.options.project_name = tenant_name # Save default domain self.default_domain = self.options.default_domain def _load_plugins(self) -> None: """Load plugins via stevedore osc-lib has no opinion on what plugins should be loaded """ pass def _load_commands(self) -> None: """Load commands via cliff/stevedore osc-lib has no opinion on what commands should be loaded """ pass def initialize_app(self, argv: list[str]) -> None: """Global app init bits: * set up API versions * validate authentication info * authenticate against Identity if requested """ # Parent __init__ parses argv into self.options super().initialize_app(argv) self.log.info( "START with options: %s", strutils.mask_password(" ".join(self.command_options)) if self.command_options else "", ) self.log.debug("options: %s", strutils.mask_password(self.options)) # Callout for stuff between superclass init and o-c-c self._final_defaults() # Do configuration file handling try: self.cloud_config = cloud_config.OSC_Config( pw_func=prompt_for_password, override_defaults={ 'interface': DEFAULT_INTERFACE, 'auth_type': self._auth_type, }, ) except OSError as e: self.log.critical("Could not read clouds.yaml configuration file") self.print_help_if_requested() raise e # TODO(thowe): Change cliff so the default value for debug # can be set to None. if not self.options.debug: self.options.debug = None # NOTE(dtroyer): Need to do this with validate=False to defer the # auth plugin handling to ClientManager.setup_auth() self.cloud = self.cloud_config.get_one( cloud=self.options.cloud, argparse=self.options, validate=False, ) self.log_configurator.configure(self.cloud) self.dump_stack_trace = self.log_configurator.dump_trace self.log.debug("defaults: %s", self.cloud_config.defaults) self.log.debug( "cloud cfg: %s", strutils.mask_password(self.cloud.config) ) # Callout for stuff between o-c-c and ClientManager # self._initialize_app_2(self.options) self._load_plugins() self._load_commands() # Handle deferred help and exit self.print_help_if_requested() self.client_manager = clientmanager.ClientManager( cli_options=self.cloud, api_version=self.api_version, pw_func=prompt_for_password, ) def prepare_to_run_command(self, cmd: 'command.Command') -> None: """Set up auth and API versions""" self.log.debug( 'command: %s -> %s.%s (auth=%s)', getattr(cmd, 'cmd_name', ''), cmd.__class__.__module__, cmd.__class__.__name__, getattr(cmd, 'auth_required', None), ) # NOTE(dtroyer): If auth is not required for a command, skip # get_one()'s validation to avoid loading plugins validate = getattr(cmd, 'auth_required', False) # NOTE(dtroyer): Save the auth required state of the _current_ command # in the ClientManager self.client_manager._auth_required = validate # Validate auth options self.cloud = self.cloud_config.get_one( cloud=self.options.cloud, argparse=self.options, validate=validate, app_name=self.client_manager._app_name, app_version=self.client_manager._app_version, additional_user_agent=[('osc-lib', version.version_string)], ) # Push the updated args into ClientManager self.client_manager._cli_options = self.cloud if validate: self.client_manager.setup_auth() if hasattr(cmd, 'required_scope') and cmd.required_scope: # let the command decide whether we need a scoped token self.client_manager.validate_scope() # Trigger the Identity client to initialize self.client_manager.session.auth.auth_ref = ( # type: ignore self.client_manager.auth_ref ) return def clean_up( self, cmd: 'command.Command', result: int, err: BaseException | None, ) -> None: self.log.debug('clean_up %s: %s', cmd.__class__.__name__, err or '') # Close SDK connection if available to have proper cleanup there if getattr(self.client_manager, "sdk_connection", None) is not None: self.client_manager.sdk_connection.close() # Close session if available if getattr(self.client_manager, "session", None) is not None: self.client_manager.session.session.close() # Process collected timing data if self.options.timing: # Get session data self.timing_data.extend( self.client_manager.session.get_timings(), ) # Use the Timing pseudo-command to generate the output tcmd = timing.Timing(self, self.options) tparser = tcmd.get_parser('Timing') # If anything other than prettytable is specified, force csv format = 'table' # Check the formatter used in the actual command if ( hasattr(cmd, 'formatter') and hasattr(cmd, '_formatter_plugins') and cmd.formatter != cmd._formatter_plugins['table'].obj ): format = 'csv' sys.stdout.write('\n') targs = tparser.parse_args(['-f', format]) tcmd.run(targs) def main(argv: list[str] | None = None) -> int: if argv is None: argv = sys.argv[1:] return OpenStackShell().run(argv) if __name__ == "__main__": sys.exit(main(sys.argv[1:]))