Merge "openstack delete user forbidden checks"

This commit is contained in:
Zuul
2025-07-24 13:37:44 +00:00
committed by Gerrit Code Review
8 changed files with 500 additions and 0 deletions

View File

@@ -0,0 +1,45 @@
class OpenstackTokenIssueObject:
"""
Class to represent an OpenStack token from the token issue output.
"""
def __init__(self):
"""
Constructor.
"""
self.id = None
self.expires = None
self.project_id = None
self.user_id = None
def set_id(self, token_id: str):
"""Setter for token ID"""
self.id = token_id
def get_id(self) -> str:
"""Getter for token ID"""
return self.id
def set_expires(self, expires: str):
"""Setter for expires"""
self.expires = expires
def get_expires(self) -> str:
"""Getter for expires"""
return self.expires
def set_project_id(self, project_id: str):
"""Setter for project ID"""
self.project_id = project_id
def get_project_id(self) -> str:
"""Getter for project ID"""
return self.project_id
def set_user_id(self, user_id: str):
"""Setter for user ID"""
self.user_id = user_id
def get_user_id(self) -> str:
"""Getter for user ID"""
return self.user_id

View File

@@ -0,0 +1,78 @@
from framework.exceptions.keyword_exception import KeywordException
from keywords.cloud_platform.openstack.openstack_table_parser import OpenStackTableParser
from keywords.openstack.openstack.token.object.openstack_token_issue_object import OpenstackTokenIssueObject
class OpenstackTokenIssueOutput:
"""
Class to parse and handle the output of 'openstack token issue' command.
"""
def __init__(self, output: str):
"""
Constructor.
Args:
output (str): Raw output from 'openstack token issue' command
"""
self.output = output.split("\n") if isinstance(output, str) else output
self.table_parser = OpenStackTableParser(self.output)
self.token = self._build_token_object()
def _build_token_object(self) -> OpenstackTokenIssueObject:
"""
Build OpenstackTokenIssueObject from parsed table data.
Returns:
OpenstackTokenIssueObject: Token object instance
"""
output_values_list = self.table_parser.get_output_values_list()
token = OpenstackTokenIssueObject()
for item in output_values_list:
field = item.get("Field", "")
value = item.get("Value", "")
if field == "id":
token.set_id(value)
elif field == "expires":
token.set_expires(value)
elif field == "project_id":
token.set_project_id(value)
elif field == "user_id":
token.set_user_id(value)
return token
def get_token(self) -> OpenstackTokenIssueObject:
"""
Get the token object.
Returns:
OpenstackTokenIssueObject: Token object instance
"""
return self.token
def get_token_id(self) -> str:
"""
Extract the token ID from the token object.
Returns:
str: Token ID
Raises:
KeywordException: If token ID is not found or token parsing failed
"""
token_id = self.token.get_id()
if not token_id:
raise KeywordException("Failed to parse token ID from OpenStack token issue output")
return token_id
def get_raw_output(self) -> str:
"""
Get the raw output from the command.
Returns:
str: Raw command output
"""
return self.output

View File

@@ -0,0 +1,43 @@
from framework.ssh.ssh_connection import SSHConnection
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_openrc
from keywords.openstack.openstack.token.object.openstack_token_issue_output import OpenstackTokenIssueOutput
class OpenstackTokenKeywords(BaseKeyword):
"""
Class for OpenStack Token operations
"""
def __init__(self, ssh_connection: SSHConnection):
"""
Constructor.
Args:
ssh_connection (SSHConnection): SSH connection to the OpenStack controller
"""
self.ssh_connection = ssh_connection
def token_issue(self) -> OpenstackTokenIssueOutput:
"""
Issue an OpenStack authentication token.
Returns:
OpenstackTokenIssueOutput: Object containing parsed token data
"""
cmd = "openstack token issue"
output = self.ssh_connection.send(source_openrc(cmd), get_pty=True)
self.validate_success_return_code(self.ssh_connection)
complete_output = "\n".join(output) if isinstance(output, list) else output
return OpenstackTokenIssueOutput(complete_output)
def get_auth_token(self) -> str:
"""
Get authentication token ID for API calls.
Returns:
str: Authentication token ID
"""
token_output = self.token_issue()
return token_output.get_token_id()

View File

@@ -0,0 +1,45 @@
class OpenstackUserListObject:
"""
Class to represent a single OpenStack user from the user list output.
"""
def __init__(self):
"""
Constructor.
"""
self.id = None
self.name = None
self.domain_id = None
self.enabled = None
def set_id(self, user_id: str):
"""Setter for user ID"""
self.id = user_id
def get_id(self) -> str:
"""Getter for user ID"""
return self.id
def set_name(self, name: str):
"""Setter for user name"""
self.name = name
def get_name(self) -> str:
"""Getter for user name"""
return self.name
def set_domain_id(self, domain_id: str):
"""Setter for domain ID"""
self.domain_id = domain_id
def get_domain_id(self) -> str:
"""Getter for domain ID"""
return self.domain_id
def set_enabled(self, enabled: str):
"""Setter for enabled status"""
self.enabled = enabled
def get_enabled(self) -> str:
"""Getter for enabled status"""
return self.enabled

View File

@@ -0,0 +1,76 @@
from keywords.cloud_platform.openstack.openstack_table_parser import OpenStackTableParser
from keywords.openstack.openstack.user.object.openstack_user_list_object import OpenstackUserListObject
class OpenstackUserListOutput:
"""
Class to parse and handle the output of 'openstack user list' command.
"""
def __init__(self, output: str):
"""
Constructor.
Args:
output (str): Raw output from 'openstack user list' command
"""
self.output = output.split("\n") if isinstance(output, str) else output
self.table_parser = OpenStackTableParser(self.output)
self.users = self._build_user_objects()
def _build_user_objects(self) -> list:
"""
Build list of OpenstackUserListObject from parsed table data.
Returns:
list: List of OpenstackUserListObject instances
"""
output_values_list = self.table_parser.get_output_values_list()
users = []
for item in output_values_list:
if item.get("ID", "").strip(): # Only create objects for valid entries
user = OpenstackUserListObject()
user.set_id(item.get("ID", ""))
user.set_name(item.get("Name", ""))
user.set_domain_id(item.get("Domain ID", ""))
user.set_enabled(item.get("Enabled", ""))
users.append(user)
return users
def get_users(self) -> list:
"""
Get list of user objects.
Returns:
list: List of OpenstackUserListObject instances
"""
return self.users
def get_user_ids(self) -> list:
"""
Extract all user IDs from the user objects.
Returns:
list: List of user IDs
"""
return [user.get_id() for user in self.users]
def get_user_names(self) -> list:
"""
Extract all user names from the user objects.
Returns:
list: List of user names
"""
return [user.get_name() for user in self.users]
def get_raw_output(self) -> str:
"""
Get the raw output from the command.
Returns:
str: Raw command output
"""
return self.output

View File

@@ -0,0 +1,71 @@
from config.configuration_manager import ConfigurationManager
from framework.exceptions.keyword_exception import KeywordException
from framework.ssh.ssh_connection import SSHConnection
from keywords.base_keyword import BaseKeyword
from keywords.openstack.openstack.token.openstack_token_keywords import OpenstackTokenKeywords
class OpenstackUserCurlKeywords(BaseKeyword):
"""
Class for OpenStack User operations using curl commands
"""
def __init__(self, ssh_connection: SSHConnection):
"""
Constructor.
Args:
ssh_connection (SSHConnection): SSH connection to the OpenStack controller
"""
self.ssh_connection = ssh_connection
def get_auth_token(self) -> str:
"""
Get authentication token for API calls.
Returns:
str: Authentication token
Raises:
KeywordException: If token cannot be retrieved
"""
token_keywords = OpenstackTokenKeywords(self.ssh_connection)
token = token_keywords.get_auth_token()
if not token:
raise KeywordException("Failed to retrieve authentication token")
return token
def get_lab_fip(self) -> str:
"""
Get the floating IP for the lab environment.
Uses the ACE framework configuration manager.
Returns:
str: Floating IP address with brackets for IPv6, without for IPv4
"""
lab_config = ConfigurationManager.get_lab_config()
fip = lab_config.get_floating_ip()
if lab_config.is_ipv6():
return f"[{fip}]"
return fip
def delete_user_via_curl(self, user_id: str) -> tuple:
"""
Delete user via curl command.
Args:
user_id (str): User ID to delete
Returns:
tuple: (return_code, output)
"""
token = self.get_auth_token()
fip = self.get_lab_fip()
curl_cmd = f'curl -s -X DELETE "https://{fip}:5000/v3/users/{user_id}" -H "X-Auth-Token: {token}" | python -m json.tool'
output = self.ssh_connection.send(curl_cmd, get_pty=True)
return_code = self.ssh_connection.get_return_code()
return return_code, "\n".join(output) if isinstance(output, list) else output

View File

@@ -0,0 +1,55 @@
from framework.ssh.ssh_connection import SSHConnection
from keywords.base_keyword import BaseKeyword
from keywords.cloud_platform.command_wrappers import source_openrc
from keywords.openstack.openstack.user.object.openstack_user_list_output import OpenstackUserListOutput
class OpenstackUserListKeywords(BaseKeyword):
"""
Class for OpenStack User List operations
"""
def __init__(self, ssh_connection: SSHConnection):
"""
Constructor.
Args:
ssh_connection (SSHConnection): SSH connection to the OpenStack controller
"""
self.ssh_connection = ssh_connection
def get_openstack_user_list(self) -> OpenstackUserListOutput:
"""
Get list of all OpenStack users.
Returns:
OpenstackUserListOutput: Object containing parsed user list data
"""
cmd = "openstack user list"
output = self.ssh_connection.send(source_openrc(cmd), get_pty=True)
self.validate_success_return_code(self.ssh_connection)
# Use the complete output, not just the first line
complete_output = "\n".join(output) if isinstance(output, list) else output
return OpenstackUserListOutput(complete_output)
def delete_user(self, user_id: str) -> str:
"""
Delete an OpenStack user by ID.
Args:
user_id (str): The user ID to delete
Returns:
str: Command output message
"""
cmd = f"openstack user delete {user_id}"
output = self.ssh_connection.send(source_openrc(cmd), get_pty=True)
# Handle empty output or return complete output
if not output:
return "No output received from delete command"
# Join all output lines to capture complete error message
return "\n".join(output) if isinstance(output, list) else str(output)

View File

@@ -0,0 +1,87 @@
"""Test cases for deleting keystone users using openstack user delete command
and verifying the user is not able to delete with the valid message"""
from pytest import mark
from framework.logging.automation_logger import get_logger
from framework.validation.validation import validate_equals, validate_not_equals, validate_str_contains
from keywords.cloud_platform.ssh.lab_connection_keywords import LabConnectionKeywords
from keywords.openstack.openstack.user.openstack_user_curl_keywords import OpenstackUserCurlKeywords
from keywords.openstack.openstack.user.openstack_user_keywords import OpenstackUserListKeywords
@mark.p2
def test_openstack_delete_user_forbidden():
"""
Test delete user via openstack CLI is forbidden.
Test Steps:
1. Get auth_info and connection to use for subsequent CLI calls
2. Get list of all users
3. Attempt to delete each user and verify forbidden message
4. Verify that deletion is properly blocked with expected error message
"""
get_logger().log_info("Get SSH connection to active controller...")
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
get_logger().log_info("Initialize OpenStack user keywords...")
user_keywords = OpenstackUserListKeywords(ssh_connection)
get_logger().log_info("Get list of all OpenStack users...")
user_list_output = user_keywords.get_openstack_user_list()
user_ids = user_list_output.get_user_ids()
validate_not_equals(len(user_ids), 0, "Verify users found in the system")
get_logger().log_info(f"Found {len(user_ids)} users to test deletion")
get_logger().log_info("Attempt to delete users and verify forbidden message...")
# Attempt to delete each user
for user_id in user_ids:
result = user_keywords.delete_user(user_id)
get_logger().log_info(f"Delete user {user_id} result: {result}")
expected_message = "You are forbidden to perform the requested action. This action is system-critical and cannot be executed- identity:delete_user. Please contact your administrator for further assistance. (HTTP 403)"
validate_str_contains(result, expected_message, f"Verify forbidden message for user {user_id}")
get_logger().log_info("All user deletion attempts were properly forbidden as expected")
@mark.p2
def test_openstack_delete_user_curl_forbidden():
"""
Test delete user via curl is forbidden.
Test Steps:
1. Get auth_info and connection to use for subsequent curl calls
2. Get list of all users
3. Attempt to delete each user via curl and verify forbidden message
4. Verify that curl deletion is properly blocked with expected error message
"""
get_logger().log_info("Get SSH connection to active controller...")
ssh_connection = LabConnectionKeywords().get_active_controller_ssh()
get_logger().log_info("Initialize OpenStack user curl keywords...")
user_curl_keywords = OpenstackUserCurlKeywords(ssh_connection)
# Get user list using CLI keywords
user_keywords = OpenstackUserListKeywords(ssh_connection)
user_list_output = user_keywords.get_openstack_user_list()
user_ids = user_list_output.get_user_ids()
validate_not_equals(len(user_ids), 0, "Verify users found in the system")
get_logger().log_info(f"Found {len(user_ids)} users to test curl deletion")
get_logger().log_info("Attempt to delete users via curl and verify forbidden message...")
# Attempt to delete each user via curl
for user_id in user_ids:
rc, output = user_curl_keywords.delete_user_via_curl(user_id)
get_logger().log_info(f"Delete user {user_id} via curl result: RC={rc}, Output={output}")
expected_message = "You are forbidden to perform the requested action. This action is system-critical and cannot be executed- identity:delete_user. Please contact your administrator for further assistance."
validate_equals(rc, 0, f"Verify curl command succeeded for user {user_id}")
validate_str_contains(output, expected_message, f"Verify forbidden message in curl response for user {user_id}")
validate_str_contains(output, "403", f"Verify HTTP 403 status in curl response for user {user_id}")
get_logger().log_info("All curl user deletion attempts were properly forbidden as expected")