Files
rally-openstack/rally_openstack/task/contexts/keystone/users.py
Andriy Kurilin 14e494d197 Base mypy support
Change-Id: Idf4283556dbc5a0af9a07d317d13cf20d9b496fb
Signed-off-by: Andriy Kurilin <andr.kurilin@gmail.com>
2025-09-09 17:36:10 +02:00

342 lines
14 KiB
Python

# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import collections
import copy
import typing as t
import uuid
from rally.common import broker
from rally.common import cfg
from rally.common import logging
from rally.common import utils
from rally.common import validation
from rally import exceptions
from rally_openstack.common import consts
from rally_openstack.common import credential
from rally_openstack.common import osclients
from rally_openstack.common.services.identity import identity
from rally_openstack.common.services.network import neutron
from rally_openstack.task import context
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
RESOURCE_MANAGEMENT_WORKERS_DESCR = (
"The number of concurrent threads to use for serving users context.")
PROJECT_DOMAIN_DESCR = "ID of domain in which projects will be created."
USER_DOMAIN_DESCR = "ID of domain in which users will be created."
@validation.add("required_platform", platform="openstack", users=True)
@context.configure(name="users", platform="openstack", order=100)
class UserGenerator(context.OpenStackContext):
"""Creates specified amount of keystone users and tenants."""
CONFIG_SCHEMA = {
"type": "object",
"$schema": consts.JSON_SCHEMA,
"anyOf": [
{"description": "Create new temporary users and tenants.",
"properties": {
"tenants": {
"type": "integer",
"minimum": 1,
"description": "The number of tenants to create."
},
"users_per_tenant": {
"type": "integer",
"minimum": 1,
"description": "The number of users to create per one "
"tenant."},
"user_password": {
"type": "string",
"description": "Specify custom user password instead of "
"randomly generated in case of password "
"requirements."},
"resource_management_workers": {
"type": "integer",
"minimum": 1,
"description": RESOURCE_MANAGEMENT_WORKERS_DESCR},
"project_domain": {
"type": "string",
"description": PROJECT_DOMAIN_DESCR},
"user_domain": {
"type": "string",
"description": USER_DOMAIN_DESCR},
"user_choice_method": {
"$ref": "#/definitions/user_choice_method"}},
"additionalProperties": False},
# TODO(andreykurilin): add ability to specify users here.
{"description": "Use existing users and tenants.",
"properties": {
"user_choice_method": {
"$ref": "#/definitions/user_choice_method"}
},
"additionalProperties": False}
],
"definitions": {
"user_choice_method": {
"enum": ["random", "round_robin"],
"description": "The mode of balancing usage of users between "
"scenario iterations."}
}
}
DEFAULT_CONFIG = {"user_choice_method": "random"}
DEFAULT_FOR_NEW_USERS = {
"tenants": 1,
"users_per_tenant": 1,
"resource_management_workers":
cfg.CONF.openstack.users_context_resource_management_workers,
}
def __init__(self, context):
super(UserGenerator, self).__init__(context)
creds = self.env["platforms"]["openstack"]
if creds.get("admin"):
admin_cred = copy.deepcopy(creds["admin"])
api_info = copy.deepcopy(creds.get("api_info", {}))
if "api_info" in admin_cred:
api_info.update(creds["admin"]["api_info"])
admin_cred["api_info"] = api_info
context["admin"] = {
"credential": credential.OpenStackCredential(**admin_cred)
}
if creds["users"] and not (set(self.config) - {"user_choice_method"}):
self.existing_users = creds["users"]
else:
self.existing_users = []
self.credential = context["admin"]["credential"]
project_domain = (self.credential["project_domain_name"]
or cfg.CONF.openstack.project_domain)
user_domain = (self.credential["user_domain_name"]
or cfg.CONF.openstack.user_domain)
self.DEFAULT_FOR_NEW_USERS["project_domain"] = project_domain
self.DEFAULT_FOR_NEW_USERS["user_domain"] = user_domain
with t.cast(utils.LockedDict, self.config).unlocked():
for key, value in self.DEFAULT_FOR_NEW_USERS.items():
self.config.setdefault(key, value)
def _create_tenants(self, threads):
tenants = collections.deque()
def publish(queue):
for i in range(self.config["tenants"]):
args = (self.config["project_domain"], self.task["uuid"], i)
queue.append(args)
def consume(cache, args):
domain, task_id, i = args
if "client" not in cache:
clients = osclients.Clients(self.credential)
cache["client"] = identity.Identity(
clients, name_generator=self.generate_random_name)
tenant = cache["client"].create_project(domain_name=domain)
tenant_dict = {"id": tenant.id, "name": tenant.name, "users": []}
tenants.append(tenant_dict)
# NOTE(msdubov): consume() will fill the tenants list in the closure.
broker.run(publish, consume, threads)
tenants_dict = {}
for tenant in tenants:
tenants_dict[tenant["id"]] = tenant
return tenants_dict
def _create_users(self, threads):
# NOTE(msdubov): This should be called after _create_tenants().
users_per_tenant = self.config["users_per_tenant"]
default_role = cfg.CONF.openstack.keystone_default_role
users = collections.deque()
def publish(queue):
for tenant_id in self.context["tenants"]:
for user_id in range(users_per_tenant):
username = self.generate_random_name()
password = (str(uuid.uuid4())
if self.config.get("user_password") is None
else self.config["user_password"])
args = (username, password, self.config["project_domain"],
self.config["user_domain"], tenant_id)
queue.append(args)
def consume(cache, args):
username, password, project_dom, user_dom, tenant_id = args
if "client" not in cache:
clients = osclients.Clients(self.credential)
cache["client"] = identity.Identity(
clients, name_generator=self.generate_random_name)
client = cache["client"]
user = client.create_user(username, password=password,
project_id=tenant_id,
domain_name=user_dom,
default_role=default_role)
user_credential = credential.OpenStackCredential(
auth_url=self.credential["auth_url"],
username=user.name,
password=password,
tenant_name=self.context["tenants"][tenant_id]["name"],
permission=consts.EndpointPermission.USER,
project_domain_name=project_dom,
user_domain_name=user_dom,
endpoint_type=self.credential["endpoint_type"],
https_insecure=self.credential["https_insecure"],
https_cacert=self.credential["https_cacert"],
region_name=self.credential["region_name"],
profiler_hmac_key=self.credential["profiler_hmac_key"],
profiler_conn_str=self.credential["profiler_conn_str"],
api_info=self.credential["api_info"])
users.append({"id": user.id,
"credential": user_credential,
"tenant_id": tenant_id})
# NOTE(msdubov): consume() will fill the users list in the closure.
broker.run(publish, consume, threads)
return list(users)
def create_users(self):
"""Create tenants and users, using the broker pattern."""
threads = min(self.config["resource_management_workers"],
self.config["tenants"])
LOG.debug("Creating %(tenants)d tenants using %(threads)s threads"
% {"tenants": self.config["tenants"], "threads": threads})
self.context["tenants"] = self._create_tenants(threads)
if len(self.context["tenants"]) < self.config["tenants"]:
raise exceptions.ContextSetupFailure(
ctx_name=self.get_name(),
msg="Failed to create the requested number of tenants.")
users_num = self.config["users_per_tenant"] * self.config["tenants"]
threads = min(self.config["resource_management_workers"], users_num)
LOG.debug("Creating %(users)d users using %(threads)s threads"
% {"users": users_num, "threads": threads})
self.context["users"] = self._create_users(threads)
for user in self.context["users"]:
self.context["tenants"][user["tenant_id"]]["users"].append(user)
if len(self.context["users"]) < users_num:
raise exceptions.ContextSetupFailure(
ctx_name=self.get_name(),
msg="Failed to create the requested number of users.")
def use_existing_users(self):
LOG.debug("Using existing users for OpenStack platform.")
api_info = copy.deepcopy(self.env["platforms"]["openstack"].get(
"api_info", {}))
self.context["config"]["existing_users"] = self.existing_users
for user_credential in self.existing_users:
user_credential = copy.deepcopy(user_credential)
if "api_info" in user_credential:
api_info.update(user_credential["api_info"])
user_credential["api_info"] = api_info
user_credential = credential.OpenStackCredential(**user_credential)
user_clients = osclients.Clients(user_credential)
user_id = user_clients.keystone.auth_ref.user_id
tenant_id = user_clients.keystone.auth_ref.project_id
if tenant_id not in self.context["tenants"]:
self.context["tenants"][tenant_id] = {
"id": tenant_id,
"name": user_credential.tenant_name
}
self.context["users"].append({
"credential": user_credential,
"id": user_id,
"tenant_id": tenant_id
})
def setup(self):
self.context["users"] = []
self.context["tenants"] = {}
self.context["user_choice_method"] = self.config["user_choice_method"]
if self.existing_users:
self.use_existing_users()
else:
self.create_users()
def _remove_default_security_group(self):
"""Delete default security group for tenants."""
admin_client = neutron.NeutronService(
clients=osclients.Clients(self.credential),
atomic_inst=self.atomic_actions()
)
if not admin_client.supports_extension("security-group", silent=True):
LOG.debug("Security group context is disabled.")
return
security_groups = admin_client.list_security_groups(name="default")
for security_group in security_groups:
if security_group["tenant_id"] not in self.context["tenants"]:
continue
admin_client.delete_security_group(security_group["id"])
def _get_consumer_for_deletion(self, func_name):
def consume(cache, resource_id):
if "client" not in cache:
clients = osclients.Clients(self.credential)
cache["client"] = identity.Identity(clients)
getattr(cache["client"], func_name)(resource_id)
return consume
def _delete_tenants(self):
threads = self.config["resource_management_workers"]
def publish(queue):
for tenant_id in self.context["tenants"]:
queue.append(tenant_id)
broker.run(publish, self._get_consumer_for_deletion("delete_project"),
threads)
self.context["tenants"] = {}
def _delete_users(self):
threads = self.config["resource_management_workers"]
def publish(queue):
for user in self.context["users"]:
queue.append(user["id"])
broker.run(publish, self._get_consumer_for_deletion("delete_user"),
threads)
self.context["users"] = []
def cleanup(self):
"""Delete tenants and users, using the broker pattern."""
if self.existing_users:
# nothing to do here.
return
else:
self._remove_default_security_group()
self._delete_users()
self._delete_tenants()