update/software-client/software_client/client.py
Joseph Vazhappilly 169b84bc6c Restrict software commands to keystone admin role
When using Keystone auth for software cli, only user with 'admin' role
is allowed to run any commands. When using software cli without 'sudo',
all software commands require user with 'admin' role.

This review also update the exception handling and error reporting.

Test Plan:
PASS: A Keystone user in the 'admin' project with 'admin' role should
      be able to run ALL 'software' commands WITHOUT SUDO
PASS: A Keystone user in the 'admin' project with only 'member' and/or
      'reader' role should NOT be able to run ANY 'software' commands
      WITHOUT SUDO

Story: 2010676
Task: 49754

Change-Id: I46653021b1a82bccded5eb870dc0907cd5c2351b
Signed-off-by: Joseph Vazhappilly <joseph.vazhappillypaily@windriver.com>
2024-03-25 03:24:32 -04:00

174 lines
6.8 KiB
Python

#
# Copyright (c) 2013-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
from oslo_utils import importutils
from software_client import exc
from software_client.constants import TOKEN, KEYSTONE, LOCAL_ROOT
SERVICE_NAME = 'usm'
SERVICE_TYPE = 'usm'
API_PORT = "5493"
API_ENDPOINT = "http://127.0.0.1:" + API_PORT
def _make_session(**kwargs):
from keystoneauth1 import loading
"""Construct a session based on authentication information
:param kwargs: keyword args containing credentials, either:
* os_auth_token: pre-existing token to re-use
* system_url: system API endpoint
or:
* os_username: name of user
* os_password: user's password
* os_auth_url: endpoint to authenticate against
* insecure: allow insecure SSL (no cert verification)
* os_tenant_{name|id}: name or ID of tenant
* os_region_name: region of the service
* os_project_name: name of a project
* os_project_id: ID of a project
* os_user_domain_name: name of a domain the user belongs to
* os_user_domain_id: ID of a domain the user belongs to
* os_project_domain_name: name of a domain the project belongs to
* os_project_domain_id: ID of a domain the project belongs to
* timeout: request timeout (in seconds)
* ca_file: trusted CA file
* cert_file: client certificate file
* key_file: client key file
"""
session = None
if (kwargs.get('os_username') and
kwargs.get('os_password') and
kwargs.get('os_auth_url') and
(kwargs.get('os_project_id') or
kwargs.get('os_project_name'))):
auth_kwargs = {}
auth_url = kwargs.get('os_auth_url')
project_id = kwargs.get('os_project_id')
project_name = kwargs.get('os_project_name')
user_domain_id = kwargs.get('os_user_domain_id')
user_domain_name = kwargs.get('os_user_domain_name') or "Default"
project_domain_id = kwargs.get('os_project_domain_id')
project_domain_name = kwargs.get('os_project_domain_name') or "Default"
auth_type = 'password'
username = kwargs.get('os_username')
password = kwargs.get('os_password')
auth_kwargs.update({
'auth_url': auth_url,
'project_id': project_id,
'project_name': project_name,
'user_domain_id': user_domain_id,
'user_domain_name': user_domain_name,
'project_domain_id': project_domain_id,
'project_domain_name': project_domain_name,
'username': username,
'password': password
})
# construct the appropriate session
timeout = kwargs.get('timeout')
insecure = kwargs.get('insecure')
cacert = kwargs.get('ca_file')
cert = kwargs.get('cert_file')
key = kwargs.get('key_file')
loader = loading.get_plugin_loader(auth_type)
auth_plugin = loader.load_from_options(**auth_kwargs)
session = loading.session.Session().load_from_options(auth=auth_plugin,
timeout=timeout,
insecure=insecure,
cacert=cacert,
cert=cert,
key=key)
# session could still be None
return session
def get_client(api_version, auth_mode, session=None, service_type=SERVICE_TYPE, **kwargs):
"""Get an authenticated client, based on credentials in the keyword args.
:param api_version: the API version to use ('1' or '2')
:param auth_mode: the authentication mode (token, keystone, local_root)
:param session: the session to use (if it exists)
:param service_type: service_type should always be 'usm'
:param kwargs: additional keyword args to pass to the client or auth
"""
endpoint = kwargs.get('software_url')
auth_token = kwargs.get('os_auth_token')
local_root = auth_mode == LOCAL_ROOT
# if we have an endpoint and token, use those
if local_root or (endpoint and auth_token):
pass
elif not session:
# Make a session to determine the endpoint
session = _make_session(**kwargs)
if not endpoint:
if session:
try:
interface = kwargs.get('os_endpoint_type')
region_name = kwargs.get('os_region_name')
endpoint = session.get_endpoint(service_type=service_type,
interface=interface,
region_name=region_name)
except Exception as e:
msg = ('Failed to get openstack endpoint')
raise exc.EndpointException(
('%(message)s, error was: %(error)s') %
{'message': msg, 'error': e})
elif local_root:
endpoint = API_ENDPOINT
else:
exception_msg = ('Missing / invalid authorization credentials')
raise exc.AmbigiousAuthSystem(exception_msg)
if endpoint:
api_version_str = 'v' + api_version
if api_version_str not in endpoint.split('/'):
endpoint = endpoint + '/' + api_version_str
if session:
# this will be a LegacyJsonAdapter
cli_kwargs = {
'session': session,
'service_type': service_type,
'service_name': SERVICE_NAME,
'interface': kwargs.get('os_endpoint_type'),
'region_name': kwargs.get('os_region_name'),
'endpoint_override': endpoint,
'global_request_id': kwargs.get('global_request_id'),
'user_agent': kwargs.get('user_agent', 'software_client'),
'api_version': kwargs.get('system_api_version')
}
else:
# This will become a httplib2 object
auth_ref = None
cli_kwargs = {
'local_root': local_root,
'token': auth_token,
'insecure': kwargs.get('insecure'),
'cacert': kwargs.get('cacert'),
'timeout': kwargs.get('timeout'),
'ca_file': kwargs.get('ca_file'),
'cert_file': kwargs.get('cert_file'),
'key_file': kwargs.get('key_file'),
'auth_ref': auth_ref,
'auth_url': kwargs.get('os_auth_url'),
'api_version': kwargs.get('system_api_version')
}
return Client(api_version, endpoint, session, **cli_kwargs)
def Client(version, *args, **kwargs):
module = importutils.import_versioned_module('software_client',
version, 'client')
client_class = getattr(module, 'Client')
return client_class(*args, **kwargs)