Files
codegenerator/codegenerator/rust_cli.py
Artem Goncharov ca752ffa71 Improve UX clarity of "limit" query parameter
Rename "limit" query parameter of the list operations to "page-size" for
giving precise meaning (keep alias "limit"). This is necessary to
finally resolve the conflict that we have in the python-openstackclient
where "limit" means a page size and all entries are fetched unless you
also specify the marker (which is not something anybody can really
understand why).

Change-Id: I9a6ddcbe30d92c4d190229ac77a10c28eb0015fb
Signed-off-by: Artem Goncharov <artem.goncharov@gmail.com>
2025-06-26 16:17:49 +02:00

1194 lines
48 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import logging
from pathlib import Path
import subprocess
import re
from typing import Type
from codegenerator.base import BaseGenerator
from codegenerator import common
from codegenerator import model
from codegenerator.common import rust as common_rust
from codegenerator.common import BasePrimitiveType
from codegenerator.common import BaseCombinedType
from codegenerator.common import BaseCompoundType
BASIC_FIELDS = [
"id",
"name",
"title",
"created_at",
"updated_at",
"uuid",
"state",
"status",
"operating_status",
]
class BooleanFlag(common_rust.Boolean):
"""Boolean parameter that is represented as a CLI flag"""
type_hint: str = "bool"
clap_macros: set[str] = {"action=clap::ArgAction::SetTrue"}
original_data_type: BaseCompoundType | BasePrimitiveType | None = None
class String(common_rust.String):
"""CLI String type"""
clap_macros: set[str] = set()
original_data_type: BaseCompoundType | BaseCombinedType | None = None
# Temporary add string enum for parameters which we do not want to handle
# as StringEnums
enum: set[str] | None = None
# imports: set[str] = set(["dialoguer::Password"])
@property
def imports(self) -> set[str]:
if self.format and self.format == "password":
return {"dialoguer::Password"}
return set()
class SecretString(String):
"""CLI SecretString"""
pass
class JsonValue(common_rust.JsonValue):
"""Arbitrary JSON value"""
clap_macros: set[str] = {
'value_name="JSON"',
"value_parser=crate::common::parse_json",
}
original_data_type: BaseCombinedType | BaseCompoundType | None = None
@property
def imports(self):
imports: set[str] = {"serde_json::Value"}
if self.original_data_type and isinstance(
self.original_data_type, common_rust.Dictionary
):
imports.update(["std::collections::BTreeMap", "eyre::WrapErr"])
return imports
class StructInputField(common_rust.StructField):
"""Structure field of the CLI input"""
additional_clap_macros: set[str] = set()
@property
def type_hint(self):
typ_hint = self.data_type.type_hint
if self.is_optional:
typ_hint = f"Option<{typ_hint}>"
# Password input must be optional
if (
getattr(self.data_type, "format", None) == "password"
and not self.is_optional
):
typ_hint = f"Option<{typ_hint}>"
return typ_hint
@property
def builder_macros(self):
macros: set[str] = set()
if not isinstance(self.data_type, BaseCompoundType):
macros.update(self.data_type.builder_macros)
else:
macros.add("setter(into)")
if self.is_optional:
macros.add("default")
return f"#[builder({', '.join(sorted(macros))})]"
@property
def serde_macros(self):
macros = set()
if self.local_name != self.remote_name:
macros.add(f'rename="{self.remote_name}"')
return f"#[serde({', '.join(sorted(macros))})]"
@property
def clap_macros(self):
if isinstance(self.data_type, common_rust.Struct):
# For substrucs (and maybe enums) we tell Clap to flatten subtype
# instead of exposing attr itself
return "#[command(flatten)]"
if isinstance(self.data_type, common_rust.Option) and isinstance(
self.data_type.item_type, common_rust.Struct
):
return "#[command(flatten)]"
macros = {"long"}
try:
if self.data_type.clap_macros:
macros.update(self.data_type.clap_macros)
# i.e. CLI groups are managed through the code dynamically
macros.update(self.additional_clap_macros)
except Exception as ex:
logging.exception("Error getting clap_macros for %s: %s", self, ex)
macros.add('help_heading = "Body parameters"')
return f"#[arg({', '.join(sorted(macros))})]"
def clap_macros_ext(self, is_group: bool | None = None):
if isinstance(self.data_type, common_rust.Struct):
# For substrucs (and maybe enums) we tell Clap to flatten subtype
# instead of exposing attr itself
return "#[command(flatten)]"
macros = {"long"}
if is_group and not self.is_optional:
macros.add("required=false")
try:
if self.data_type.clap_macros:
macros.update(self.data_type.clap_macros)
# i.e. CLI groups are managed through the code dynamically
macros.update(self.additional_clap_macros)
except Exception as ex:
logging.exception("Error getting clap_macros for %s: %s", self, ex)
macros.add('help_heading = "Body parameters"')
return f"#[arg({', '.join(sorted(macros))})]"
@property
def description(self):
description = super().description
if isinstance(self.data_type, ArrayInput):
return (
description or ""
) + "\n\nParameter is an array, may be provided multiple times."
return description
class StructInput(common_rust.Struct):
field_type_class_: Type[common_rust.StructField] = StructInputField
clap_macros: set[str] = set()
original_data_type: BaseCompoundType | BaseCompoundType | None = None
is_group: bool = False
is_required: bool = False
@property
def imports(self):
imports: set[str] = {"serde::Deserialize"}
for field in self.fields.values():
imports.update(field.data_type.imports)
if self.additional_fields_type:
imports.add("crate::common::parse_key_val")
imports.update(self.additional_fields_type.imports)
return imports
class EnumGroupStructInputField(StructInputField):
"""Container for complex Enum field"""
sdk_parent_enum_variant: str | None = None
class EnumGroupStruct(common_rust.Struct):
"""Container for complex Enum containing Array"""
field_type_class_: Type[common_rust.StructField] = (
EnumGroupStructInputField
)
base_type: str = "struct"
sdk_enum_name: str
is_group: bool = True
is_required: bool = False
reference: model.Reference | None = None
class DictionaryInput(common_rust.Dictionary):
lifetimes: set[str] = set()
original_data_type: BaseCompoundType | BaseCompoundType | None = None
@property
def type_hint(self):
return f"Vec<(String, {self.value_type.type_hint})>"
@property
def imports(self):
imports = set()
if not isinstance(self.value_type, common_rust.Option):
imports.add("crate::common::parse_key_val")
else:
imports.add("crate::common::parse_key_val_opt")
imports.update(self.value_type.imports)
return imports
@property
def clap_macros(self):
macros = {"long", 'value_name="key=value"'}
if not isinstance(self.value_type, common_rust.Option):
macros.add(
"value_parser=parse_key_val::<String,"
f" {self.value_type.type_hint}>"
)
else:
macros.add(
"value_parser=parse_key_val_opt::<String,"
f" {self.value_type.item_type.type_hint}>"
)
return macros
class StringEnum(common_rust.StringEnum):
imports: set[str] = {"clap::ValueEnum"}
class ArrayInput(common_rust.Array):
original_data_type: (
common_rust.BaseCompoundType
| common_rust.BaseCombinedType
| common_rust.BasePrimitiveType
| None
) = None
@property
def clap_macros(self):
macros: set[str] = {"long", "action=clap::ArgAction::Append"}
macros.update(self.item_type.clap_macros)
if isinstance(self.item_type, ArrayInput):
macros.add("value_parser=crate::common::parse_json")
macros.add(
f'value_name="[{self.item_type.item_type.type_hint}] as JSON"'
)
return macros
class CommaSeparatedList(common_rust.CommaSeparatedList):
@property
def type_hint(self):
return f"Vec<{self.item_type.type_hint}>"
class RequestParameter(common_rust.RequestParameter):
"""OpenAPI request parameter in the Rust form"""
@property
def clap_macros(self):
macros: set[str] = set()
if not self.is_required:
macros.add("long")
if self.location == "path":
# Sometime there is a collision of path params and body params.
# In order to prevent this force clap arg ID to be prefixed, while
# the value_name is turned back to the expected value
macros.add(f'id = "path_param_{self.local_name}"')
macros.add(f'value_name = "{self.local_name.upper()}"')
macros.add('help_heading = "Path parameters"')
elif self.location == "query":
macros.update(self.data_type.clap_macros)
macros.add('help_heading = "Query parameters"')
if self.remote_name == "limit":
macros.remove("long")
macros.add('long("page-size")')
macros.add('visible_alias("limit")')
if hasattr(self.data_type, "enum") and self.data_type.enum:
values = ",".join(f'"{x}"' for x in sorted(self.data_type.enum))
macros.add(f"value_parser = [{values}]")
return f"#[arg({', '.join(sorted(macros))})]"
class RequestTypeManager(common_rust.TypeManager):
primitive_type_mapping: dict[
Type[model.PrimitiveType], Type[BasePrimitiveType]
] = {
model.PrimitiveString: String,
model.ConstraintString: String,
model.PrimitiveAny: JsonValue,
}
data_type_mapping: dict[
Type[model.ADT], Type[BaseCombinedType] | Type[BaseCompoundType]
]
data_type_mapping = {
model.Struct: StructInput,
model.Dictionary: DictionaryInput,
model.Array: ArrayInput,
model.CommaSeparatedList: ArrayInput,
model.Set: ArrayInput,
}
request_parameter_class: Type[common_rust.RequestParameter] = (
RequestParameter
)
string_enum_class = StringEnum
def get_local_attribute_name(self, name: str) -> str:
"""Get localized attribute name"""
name = name.replace(".", "_")
attr_name = "_".join(
x.lower() for x in re.split(common.SPLIT_NAME_RE, name)
)
if attr_name in ["type", "self", "enum", "ref", "default"]:
attr_name = f"_{attr_name}"
return attr_name
def get_remote_attribute_name(self, name: str) -> str:
"""Get the attribute name on the SDK side"""
return self.get_local_attribute_name(name)
def get_var_name_for(self, obj) -> str:
attr_name = "_".join(
x.lower() for x in re.split(common.SPLIT_NAME_RE, obj.name)
)
if attr_name in ["type", "self", "enum", "ref"]:
attr_name = f"_{attr_name}"
return attr_name
def _get_one_of_type(
self, type_model: model.OneOfType
) -> BaseCompoundType | BaseCombinedType | BasePrimitiveType:
"""Convert `model.OneOfType` into Rust model"""
result = super()._get_one_of_type(type_model)
# Field is of Enum type.
if isinstance(result, common_rust.Enum):
variant_classes = [
x.data_type.__class__ for x in result.kinds.values()
]
if (
StringEnum in variant_classes
and ArrayInput in variant_classes
and len(variant_classes) == 2
):
# There is a StringEnum and Array in the Enum. Clap cannot
# handle it so we convert StringEnum variants into flags
# and keep only rest
# This usecase is here at least to handle server.networks
# which are during creation `none`|`auto`|`JSON`
# On the SDK side where this method is not overriden there
# would be a naming conflict resulting in `set_models` call
# adding type name as a suffix.
# sdk_enum_name = result.name + result.__class__.__name__
sdk_enum_name = self.get_model_name(type_model.reference)
obj = EnumGroupStruct(
name=self.get_model_name(type_model.reference),
kinds={},
sdk_enum_name=sdk_enum_name,
reference=type_model.reference,
)
field_class = obj.field_type_class_
if not type_model.reference:
raise NotImplementedError
name = type_model.reference.name
for k, v in result.kinds.items():
if isinstance(v.data_type, common_rust.StringEnum):
for x in v.data_type.variants:
field = field_class(
local_name=f"{x.lower()}_{name}",
remote_name=f"{v.data_type.name}::{x}",
sdk_parent_enum_variant=f"{k}",
data_type=BooleanFlag(),
is_optional=False,
is_nullable=False,
)
obj.fields[field.local_name] = field
else:
field = field_class(
local_name=f"{name}",
remote_name=f"{k}",
data_type=v.data_type,
is_optional=True,
is_nullable=False,
)
obj.fields[field.local_name] = field
result = obj
return result
def convert_model(
self, type_model: model.PrimitiveType | model.ADT | model.Reference
) -> BasePrimitiveType | BaseCombinedType | BaseCompoundType:
"""Get local destination type from the ModelType"""
model_ref: model.Reference | None = None
typ: BasePrimitiveType | BaseCombinedType | BaseCompoundType | None = (
None
)
if isinstance(type_model, model.Reference):
model_ref = type_model
type_model = self._get_adt_by_reference(model_ref)
elif isinstance(type_model, model.ADT):
# Direct composite type
model_ref = type_model.reference
# CLI hacks
if isinstance(type_model, model.Struct) and not type_model.reference:
# Check the root structure
if len(type_model.fields) == 1:
# Struct with only 1 key
only_field = list(type_model.fields.keys())[0]
if isinstance(
type_model.fields[only_field].data_type,
model.PrimitiveNull,
):
# The only field is null. No input is necessary
logging.debug(
"API accepts only 1 field of type Null. No input is"
" required."
)
type_model.fields = {}
if isinstance(type_model, model.Array):
if isinstance(type_model.item_type, model.Reference):
item_type = self._get_adt_by_reference(type_model.item_type)
else:
item_type = type_model.item_type
if (
isinstance(item_type, model.Struct)
and len(item_type.fields.keys()) > 1
):
# An array of structs with more then 1 field
# Array of Structs can not be handled by the CLI (input).
# Therefore handle underlaying structure as Json saving
# reference to the original "expected" stuff to make final
# input conversion possible
original_data_type = self.convert_model(item_type)
# We are not interested to see unused data in the submodels
self.ignored_models.append(item_type)
# self.ignored_models.extend(
# x.data_type for x in item_type.fields.values()
# )
typ = self.data_type_mapping[model.Array](
description=common_rust.sanitize_rust_docstrings(
type_model.description
),
original_data_type=original_data_type,
item_type=JsonValue(),
)
elif isinstance(type_model, model.Dictionary):
if isinstance(type_model.value_type, model.Dictionary):
original_data_type = self.convert_model(type_model.value_type)
typ = JsonValue(
original_data_type=DictionaryInput(
name=self.get_model_name(type_model.reference),
value_type=original_data_type,
)
)
else:
if isinstance(type_model.value_type, model.Struct):
# Placement has dict of structs of dicts of structs ...
# Only simplify the top level stuff
original_data_type = self.convert_model(
type_model.value_type
)
typ = DictionaryInput(
name=self.get_model_name(type_model.reference),
description=type_model.value_type.description,
value_type=JsonValue(
original_data_type=original_data_type
),
)
if not model_ref:
model_ref = model.Reference(
name=self.root_name, type=typ.__class__
)
if type_model.value_type.reference:
self.ignored_models.append(
type_model.value_type.reference
)
if typ:
if model_ref:
self.refs[model_ref] = typ
else:
# Not hacked anything, invoke superior method
typ = super().convert_model(type_model)
return typ
def _get_struct_type(self, type_model: model.Struct) -> common_rust.Struct:
"""Convert model.Struct into rust_cli `Struct`"""
struct_class = self.data_type_mapping[model.Struct]
mod = struct_class(
name=self.get_model_name(type_model.reference),
description=common_rust.sanitize_rust_docstrings(
type_model.description
),
)
field_class = mod.field_type_class_
for field_name, field in type_model.fields.items():
is_nullable: bool = False
field_data_type = self.convert_model(field.data_type)
if (
field_name
in ["password", "original_password", "secret", "passcode"]
or self.get_model_name(type_model.reference) == "Token"
and field_name == "id"
) and isinstance(field_data_type, String):
field_data_type = SecretString(format=field_data_type.format)
if isinstance(field_data_type, self.option_type_class):
# Unwrap Option into "is_nullable"
# NOTE: but perhaps Option<Option> is better (not set vs set
# explicitly to None)
is_nullable = True
if isinstance(
field_data_type.item_type,
(common_rust.Array, DictionaryInput, String),
) or (
mod.name != "Request"
and isinstance(field_data_type.item_type, StructInput)
):
# Unwrap Option<Option<...>>
field_data_type = field_data_type.item_type
elif isinstance(field_data_type, EnumGroupStruct):
field_data_type.is_required = field.is_required
elif (
# is Dictionary
isinstance(field_data_type, DictionaryInput)
# of Primitives
and not isinstance(
field_data_type.value_type, common_rust.BasePrimitiveType
)
and not (
# and not Option<Primitive>
isinstance(
field_data_type.value_type, self.option_type_class
)
and isinstance(
field_data_type.value_type.item_type,
common_rust.BasePrimitiveType,
)
)
):
dict_type_model = self._get_adt_by_reference(field.data_type)
simplified_data_type = JsonValue()
simplified_data_type.original_data_type = (
field_data_type.value_type
)
field_data_type.value_type = simplified_data_type
self.ignored_models.append(
dict_type_model.value_type.reference
)
elif isinstance(field_data_type, StructInput):
# Check if one of the sub fields has same attribute name as in
# the current struct. Ideally this should not ever happen, but
# i.e. image.namespace.property has the case
intersect = set(type_model.fields.keys()).intersection(
set(field_data_type.fields.keys())
)
if intersect:
# Na well, it is such a rare case that it does not make
# much sense to start renaming fields. Instead conver
# substruct to be a JsonValue
simplified_data_type = JsonValue()
simplified_data_type.original_data_type = field_data_type
field_data_type = simplified_data_type
self.ignored_models.append(field.data_type)
f = field_class(
local_name=self.get_local_attribute_name(field_name),
remote_name=self.get_remote_attribute_name(field_name),
description=common_rust.sanitize_rust_docstrings(
field.description
),
data_type=field_data_type,
is_optional=not field.is_required,
is_nullable=is_nullable,
)
if mod.name != "Request" and (
isinstance(field_data_type, struct_class)
):
field_data_type.is_group = True
field_data_type.is_required = field.is_required
if isinstance(field_data_type, self.option_type_class):
f.is_nullable = True
mod.fields[field_name] = f
# Repeat additional_fields handling as in
# common/rust.py
if type_model.additional_fields:
definition = type_model.additional_fields
# Structure allows additional fields
if isinstance(definition, bool):
mod.additional_fields_type = self.primitive_type_mapping[
model.PrimitiveAny
]
else:
mod.additional_fields_type = self.convert_model(definition)
return mod
def _get_array_type(self, type_model: model.Array) -> common_rust.Array:
"""Convert `model.Array` into corresponding Rust model"""
item_type = self.convert_model(type_model.item_type)
struct_class = self.data_type_mapping[model.Struct]
# item_ref: model.Reference | None = None
# if isinstance(type_model.item_type, model.Reference):
# item_ref = type_model.item_type
# elif hasattr(type_model.item_type, "reference"):
# item_ref = type_model.item_type.reference
if isinstance(item_type, struct_class):
if len(item_type.fields.keys()) == 1:
# Server.security_groups is an object with only name -> simplify
# Only simplify structure with single simple property and name !=
# "Request" (root request)
only_field_name = list(item_type.fields.keys())[0]
only_field = item_type.fields[only_field_name]
if not isinstance(only_field.data_type, StructInput):
# If there is only single field in the struct and it is not a
# new struct simplify it.
simplified_data_type = only_field.data_type.model_copy()
simplified_data_type.original_data_type = item_type
logging.debug(
"Replacing single field object %s with %s",
type_model.item_type,
simplified_data_type,
)
self.ignored_models.append(type_model.item_type)
item_type = simplified_data_type
elif isinstance(item_type, DictionaryInput):
# Array of Freestyle objects in CLI can be only represented as
# array of JsonValue
simplified_data_type = JsonValue()
simplified_data_type.original_data_type = item_type
# self.ignored_models.append(item_ref)
item_type = simplified_data_type
return self.data_type_mapping[model.Array](
name=self.get_model_name(type_model.reference), item_type=item_type
)
def set_parameters(self, parameters: list[model.RequestParameter]) -> None:
"""Set OpenAPI operation parameters into typemanager for conversion"""
super().set_parameters(parameters)
for k, param in self.parameters.items():
if param.is_flag:
param.data_type = BooleanFlag(
original_data_type=param.data_type, **param.model_dump()
)
self.parameters[k] = param
class RustCliGenerator(BaseGenerator):
def __init__(self):
super().__init__()
def _format_code(self, *args):
"""Format code using Rustfmt
:param *args: Path to the code to format
"""
for path in args:
subprocess.run(["rustfmt", "--edition", "2024", path])
def get_parser(self, parser):
parser.add_argument(
"--operation-type",
choices=[
"list",
"show",
"create",
"set",
"action",
"delete",
"download",
"upload",
"json",
],
help="Rust CLI Command type (only for rust-cli target)",
)
parser.add_argument(
"--command-name",
help="Rust CLI Command name (used as final module name)",
)
parser.add_argument(
"--cli-mod-path",
help=(
"Mod path (dot separated) of the corresponding SDK command"
" (when non standard)"
),
)
parser.add_argument(
"--sdk-mod-path",
help=(
"Mod path (dot separated) of the corresponding SDK command"
" (when non standard)"
),
)
parser.add_argument(
"--tests", action="store_true", help="Generate tests"
)
return parser
def _render_command(
self, context: dict, impl_template: str, impl_dest: Path
):
"""Render command code"""
self._render(impl_template, context, impl_dest.parent, impl_dest.name)
def generate(
self, res, target_dir, openapi_spec=None, operation_id=None, args=None
):
"""Generate code for the Rust openstack_cli"""
logging.debug(
f"Generating Rust CLI code for `{operation_id}` in {target_dir}"
)
work_dir = Path(target_dir, "rust", "openstack_cli", "src")
if not openapi_spec:
openapi_spec = common.get_openapi_spec(args.openapi_yaml_spec)
if not operation_id:
operation_id = args.openapi_operation_id
(path, method, spec) = common.find_openapi_operation(
openapi_spec, operation_id
)
_, res_name = res.split(".") if res else (None, None)
resource_name = common.get_resource_names_from_url(path)[-1]
openapi_parser = model.OpenAPISchemaParser()
operation_params: list[model.RequestParameter] = []
sdk_mod_path_base = common.get_rust_sdk_mod_path(
args.service_type, args.api_version, args.module_path or path
)
cli_mod_path = common.get_rust_cli_mod_path(
args.service_type, args.api_version, args.module_path or path
)
types_mod_path = common.get_rust_types_mod_path(
args.service_type, args.api_version, args.module_path or path
)
target_class_name = resource_name
response_class_name: str | None = (
"".join(
x.capitalize()
for x in re.split(common.SPLIT_NAME_RE, resource_name)
)
+ "Response"
)
is_image_download: bool = False
is_json_patch: bool = False
global_additional_imports: set[str] = set()
# Collect all operation parameters
for param in openapi_spec["paths"][path].get(
"parameters", []
) + spec.get("parameters", []):
param_ = openapi_parser.parse_parameter(param)
if (
("{" + param["name"] + "}") in path and param["in"] == "path"
) or param["in"] != "path":
# Respect path params that appear in path and not path params
# param_ = openapi_parser.parse_parameter(param)
if param_.name in [
f"{resource_name}_id",
f"{resource_name.replace('_', '')}_id",
]:
# for i.e. routers/{router_id} we want local_name to be `id` and not `router_id`
param_.name = "id"
operation_params.append(param_)
# If there is resource_link set on parameter add imports used to do a lookup
if param_.resource_link:
link_res_name: str = param_.resource_link.split(".")[0]
global_additional_imports.add("tracing::warn")
global_additional_imports.add(
"openstack_sdk::api::find_by_name"
)
global_additional_imports.add("openstack_sdk::api::QueryAsync")
global_additional_imports.add(
f"openstack_sdk::api::{'::'.join(link_res_name.split('/'))}::find"
f" as find_{link_res_name.split('/')[-1]}"
)
global_additional_imports.add("eyre::OptionExt")
global_additional_imports.add("eyre::eyre")
# List of operation variants (based on the body)
operation_variants = common.get_operation_variants(
spec, action_name=args.action_name
)
body_types: list[str] = []
last_path_parameter: RequestParameter | None = None
if (
args.operation_type == "download"
and path == "/v2/images/{image_id}/file"
):
is_image_download = True
if args.operation_type == "upload":
# collect registered media types for upload operation
request_body = spec.get("requestBody")
content = request_body.get("content", {})
body_types = list(content.keys())
for operation_variant in operation_variants:
logging.debug(f"Processing variant {operation_variant}")
additional_imports = set(global_additional_imports)
type_manager: common_rust.TypeManager = RequestTypeManager()
result_is_list: bool = False
is_list_paginated: bool = False
if operation_params:
type_manager.set_parameters(operation_params)
mod_name = "_".join(
x.lower()
for x in re.split(
common.SPLIT_NAME_RE,
(
args.module_name
or args.operation_name
or args.operation_type
or method
),
)
)
operation_body = operation_variant.get("body")
microversion: str | None = None
mod_suffix: str = ""
request_types = None
if operation_body:
min_ver = operation_body.get("x-openstack", {}).get("min-ver")
if min_ver:
mod_suffix = "_" + min_ver.replace(".", "")
microversion = min_ver
(_, request_types) = openapi_parser.parse(
operation_body, ignore_read_only=True
)
# Certain hacks
for parsed_type in list(request_types):
# iterate over listed request_types since we want to modify list
if resource_name == "server" and method.lower() == "post":
# server declares OS-SCH-HNT:scheduler_hints as
# "alias" for normal scheduler hints, but the whole
# struct is there. For the cli it makes no sense and
# we filter it out from the parsed data
object_to_remove = "OS-SCH-HNT:scheduler_hints"
if (
parsed_type.reference
and parsed_type.reference.name == object_to_remove
and parsed_type.reference.type == model.Struct
):
request_types.remove(parsed_type)
elif parsed_type.reference is None and isinstance(
parsed_type, model.Struct
):
parsed_type.fields.pop(object_to_remove, None)
# and feed them into the TypeManager
type_manager.set_models(request_types)
sdk_mod_path: list[str] = sdk_mod_path_base.copy()
sdk_mod_path.append((args.sdk_mod_name or mod_name) + mod_suffix)
types_mod_path = common.get_rust_types_mod_path(
args.service_type, args.api_version, args.module_path or path
)
types_mod_path.append("response")
types_mod_path.append(args.sdk_mod_name or mod_name)
mod_name += mod_suffix
result_def: dict = {}
response_def: dict | None = {}
resource_header_metadata: dict = {}
# Process response information
# # Prepare information about response
if method.upper() != "HEAD":
response = common.find_response_schema(
spec["responses"],
args.response_key or resource_name,
args.action_name,
)
if response:
response_key: str | None
if args.response_key:
response_key = (
args.response_key
if args.response_key != "null"
else None
)
else:
response_key = resource_name
response_def, response_key = common.find_resource_schema(
response, None, response_key
)
if not response_def and response.get("type") == "string":
response_def = response
if response_def:
if response_def.get("type", "object") == "object" or (
# BS metadata is defined with type: ["object",
# "null"]
isinstance(response_def.get("type"), list)
and "object" in response_def["type"]
):
(root, response_types) = openapi_parser.parse(
response_def
)
if not isinstance(root, model.Dictionary):
if method == "patch" and not request_types:
# image patch is a jsonpatch based operation
# where there is no request. For it we need to
# look at the response and get writable
# parameters as a base
is_json_patch = True
if not args.find_implemented_by_sdk:
raise NotImplementedError
additional_imports.update(
[
"json_patch::{Patch, diff}",
"serde_json::json",
]
)
additional_imports.add(
f"openstack_types::"
+ "::".join(types_mod_path)
+ "::*"
)
(_, response_types) = openapi_parser.parse(
response_def, ignore_read_only=True
)
type_manager.set_models(response_types)
elif response_def["type"] == "string":
(root_dt, _) = openapi_parser.parse(response_def)
if not root_dt:
raise RuntimeError(
"Response data can not be processed"
)
response_props = response.get("properties", {})
if response_props and (
(
response_key
and response_props.get(response_key, {}).get(
"type"
)
== "array"
)
or response_props[
list(response_props.keys())[0]
].get("type")
== "array"
):
result_is_list = True
mod_response_path = "openstack_types::" + "::".join(
[
f"r#{x}" if x in ["trait", "type"] else x
for x in types_mod_path
]
+ [response_class_name]
)
additional_imports.add(mod_response_path)
else:
response_class_name = None
else:
response_class_name = None
mod_import_name = "openstack_sdk::api::" + "::".join(
f"r#{x}" if x in ["trait", "type"] else x
for x in sdk_mod_path
)
if not (
args.find_implemented_by_sdk
and args.operation_type in ["show", "download"]
):
additional_imports.add(mod_import_name)
if args.find_implemented_by_sdk and args.operation_type in [
"show",
"set",
"download",
]:
additional_imports.add("openstack_sdk::api::find")
additional_imports.add(
"::".join(
[
"openstack_sdk::api",
"::".join(
f"r#{x}" if x in ["trait", "type"] else x
for x in sdk_mod_path[:-1]
),
"find",
]
)
)
if args.operation_type == "list":
# Make plural form for listing
target_class_name = common.get_plural_form(
target_class_name
)
if "limit" in [
k for (k, _) in type_manager.get_parameters("query")
]:
is_list_paginated = True
additional_imports.add(
"openstack_sdk::api::{paged, Pagination}"
)
if args.operation_type == "download":
additional_imports.add("crate::common::download_file")
if args.operation_type == "upload":
additional_imports.add(
"crate::common::build_upload_asyncread"
)
additional_imports.add("openstack_sdk::api::RawQueryAsync")
additional_imports.add("openstack_sdk::api::QueryAsync")
if resource_header_metadata:
additional_imports.add("openstack_sdk::api::RawQueryAsync")
additional_imports.add("http::Response")
additional_imports.add("bytes::Bytes")
if resource_header_metadata:
additional_imports.add(
"crate::common::HashMapStringString"
)
additional_imports.add("std::collections::HashMap")
if (
len(
[
x
for x in resource_header_metadata.keys()
if "*" in x
]
)
> 0
):
additional_imports.add("regex::Regex")
if is_image_download:
additional_imports.add("openstack_sdk::api::find")
additional_imports.add("openstack_sdk::api::QueryAsync")
additional_imports.add("openstack_sdk::api::RawQueryAsync")
additional_imports.add(
"::".join(
[
"openstack_sdk::api",
"::".join(sdk_mod_path[:-2]),
"find",
]
)
)
# Discard unnecessry imports
additional_imports.discard("http::Response")
additional_imports.discard("bytes::Bytes")
additional_imports.update(type_manager.get_imports())
# additional_imports.update(response_type_manager.get_imports())
# Deserialize is already in template since it is uncoditionally required
additional_imports.discard("serde::Deserialize")
additional_imports.discard("serde::Serialize")
command_description: str = spec.get("description")
command_summary: str = spec.get("summary")
if args.operation_type == "action":
command_description = operation_body.get(
"description", command_description
)
command_summary = operation_body.get(
"summary", command_summary
)
if command_summary and microversion:
command_summary += f" (microversion = {microversion})"
if not command_description:
command_description = (
"Command without description in OpenAPI"
)
context = {
"operation_id": operation_id,
"operation_type": args.operation_type,
"operation_name": (
(args.operation_name or args.operation_type).lower()
),
"command_description": (
common_rust.sanitize_rust_docstrings(
command_description
)
),
"command_summary": common_rust.sanitize_rust_docstrings(
command_summary
),
"type_manager": type_manager,
"resource_name": resource_name,
"target_class_name": "".join(
x.title() for x in target_class_name.split("_")
),
"sdk_struct_name": "Request",
"sdk_service_name": common.get_rust_service_type_from_str(
args.service_type
),
"service_type": args.service_type,
"url": path[1:] if path.startswith("/") else path,
"method": method,
"resource_header_metadata": resource_header_metadata,
"sdk_mod_path": sdk_mod_path,
"cli_mod_path": cli_mod_path,
"result_def": result_def,
# Last path param is required for the download operation
"last_path_parameter": last_path_parameter,
"body_types": body_types,
"additional_imports": additional_imports,
"find_present": args.find_implemented_by_sdk,
"microversion": microversion,
"result_is_list": result_is_list,
"is_image_download": is_image_download,
"is_json_patch": is_json_patch,
"is_list_paginated": is_list_paginated,
"response_class_name": response_class_name,
"types_mod_path": types_mod_path,
"resource_key": res,
}
if not args.cli_mod_path:
# mod_name = args.operation_name or args.operation_type.value
impl_path = Path(
work_dir, "/".join(cli_mod_path), f"{mod_name}.rs"
)
self._render_command(context, "rust_cli/impl.rs.j2", impl_path)
self._format_code(impl_path)
if args.cli_full_command and True: # args.tests:
impl_path = Path(
work_dir.parent,
"tests",
"/".join(cli_mod_path),
f"{mod_name}_autogen.rs",
)
cmd = args.cli_full_command
if microversion:
cmd = args.cli_full_command + microversion.replace(
".", ""
)
test_context = {
"service_type": args.service_type,
"command": cmd.split(" "),
}
self._render_command(
test_context,
"rust_cli/functional_test_impl.rs.j2",
impl_path,
)
self._format_code(impl_path)
yield (cli_mod_path, mod_name, path, None)