Sadly there is no reasonable way how to extract information about supported query parameters ouf ot Octavia code. Moreover there is even no documentation about that except of basic statement "The Octavia API v2 supports filtering based on all top level attributes of a resource" which is only partially true. So for now hardcode those parameter schemas as known by OpenStackSDK (with exception of definitely wrong ones). Change-Id: I3684a6e7d31c8ac8842049373291b234574590aa
1432 lines
57 KiB
Python
1432 lines
57 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
import abc
|
|
import copy
|
|
import datetime
|
|
import enum
|
|
import importlib
|
|
import inspect
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Literal
|
|
import re
|
|
|
|
from codegenerator.common.schema import ParameterSchema
|
|
from codegenerator.common.schema import PathSchema
|
|
from codegenerator.common.schema import SpecSchema
|
|
from codegenerator.common.schema import TypeSchema
|
|
from codegenerator.openapi.utils import rst_to_md
|
|
from openapi_core import Spec
|
|
from ruamel.yaml.scalarstring import LiteralScalarString
|
|
from ruamel.yaml import YAML
|
|
from wsme import types as wtypes
|
|
|
|
|
|
VERSION_RE = re.compile(r"[Vv][0-9\.]*")
|
|
|
|
|
|
# Workaround Python's lack of an undefined sentinel
|
|
# https://python-patterns.guide/python/sentinel-object/
|
|
class Unset:
|
|
def __bool__(self) -> Literal[False]:
|
|
return False
|
|
|
|
|
|
UNSET: Unset = Unset()
|
|
|
|
|
|
def get_referred_type_data(func, name: str):
|
|
"""Get python type object referred by the function
|
|
|
|
Return `some.object` for a function like:
|
|
|
|
@wsgi.validation(some.object)
|
|
def foo():
|
|
pass
|
|
|
|
:param func: Function
|
|
:param str name: object name
|
|
"""
|
|
module = inspect.getmodule(func)
|
|
if module:
|
|
(mod, obj) = (None, None)
|
|
if "." in name:
|
|
(mod, obj) = name.split(".")
|
|
else:
|
|
raise RuntimeError('No "." in %s', name)
|
|
m = importlib.import_module(module.__name__)
|
|
if hasattr(m, mod):
|
|
mod = getattr(m, mod)
|
|
else:
|
|
raise RuntimeError("Cannot find attr %s", name)
|
|
if hasattr(mod, obj):
|
|
return getattr(mod, obj)
|
|
else:
|
|
raise RuntimeError("Cannot find definition for %s", name)
|
|
else:
|
|
raise RuntimeError("Cannot get module the function was defined in")
|
|
|
|
|
|
class OpenStackServerSourceBase:
|
|
# A URL to Operation tag (OpenApi group) mapping. Can be used when first
|
|
# non parameter path element grouping is not enough
|
|
# ("/qos/policies/{policy_id}/packet_rate_limit_rules" should be
|
|
# "qos-packet-rate-limit-rules" instead of "qos")
|
|
URL_TAG_MAP: dict[str, str] = {}
|
|
|
|
def _api_ver_major(self, ver):
|
|
return ver.ver_major
|
|
|
|
def _api_ver_minor(self, ver):
|
|
return ver.ver_minor
|
|
|
|
def _api_ver(self, ver):
|
|
return (ver.ver_major, ver.ver_minor)
|
|
|
|
def useFixture(self, fixture):
|
|
try:
|
|
fixture.setUp()
|
|
except Exception as ex:
|
|
logging.exception("Got exception", ex)
|
|
else:
|
|
return fixture
|
|
|
|
@abc.abstractmethod
|
|
def generate(self, target_dir, args) -> Path:
|
|
pass
|
|
|
|
def load_openapi(self, path):
|
|
"""Load existing OpenAPI spec from the file"""
|
|
if not path.exists():
|
|
return
|
|
yaml = YAML(typ="safe")
|
|
yaml.preserve_quotes = True
|
|
with open(path) as fp:
|
|
spec = yaml.load(fp)
|
|
|
|
return SpecSchema(**spec)
|
|
|
|
def dump_openapi(self, spec, path, validate=False):
|
|
"""Dump OpenAPI spec into the file"""
|
|
if validate:
|
|
self.validate_spec(spec)
|
|
yaml = YAML()
|
|
yaml.preserve_quotes = True
|
|
yaml.indent(mapping=2, sequence=4, offset=2)
|
|
with open(path, "w") as fp:
|
|
yaml.dump(
|
|
spec.model_dump(
|
|
exclude_none=True, exclude_defaults=True, by_alias=True
|
|
),
|
|
fp,
|
|
)
|
|
|
|
def validate_spec(self, openapi_spec):
|
|
Spec.from_dict(
|
|
openapi_spec.model_dump(
|
|
exclude_none=True, exclude_defaults=True, by_alias=True
|
|
)
|
|
)
|
|
|
|
def _sanitize_param_ver_info(self, openapi_spec, min_api_version):
|
|
# Remove min_version of params if it matches to min_api_version
|
|
for k, v in openapi_spec.components.parameters.items():
|
|
os_ext = v.openstack
|
|
if os_ext:
|
|
if os_ext.get("min-ver") == min_api_version:
|
|
v.openstack.pop("min-ver")
|
|
if "max_ver" in os_ext and os_ext["max-ver"] is None:
|
|
v.openstack.pop("max-ver")
|
|
if os_ext == {}:
|
|
v.openstack = None
|
|
|
|
def _process_route(
|
|
self, route, openapi_spec, ver_prefix=None, framework=None
|
|
):
|
|
# Placement exposes "action" as controller in route defaults, all others - "controller"
|
|
if not ("controller" in route.defaults or "action" in route.defaults):
|
|
return
|
|
if "action" in route.defaults and "_methods" in route.defaults:
|
|
# placement 405 handler
|
|
return
|
|
# Path can be "/servers/{id}", but can be
|
|
# "/volumes/:volume_id/types/:(id)" - process
|
|
# according to the routes lib logic
|
|
path = ver_prefix if ver_prefix else ""
|
|
operation_spec = None
|
|
for part in route.routelist:
|
|
if isinstance(part, dict):
|
|
path += "{" + part["name"] + "}"
|
|
else:
|
|
path += part
|
|
|
|
if path == "":
|
|
# placement has "" path - see weird explanation in the placement source code
|
|
return
|
|
|
|
# if "method" not in route.conditions:
|
|
# raise RuntimeError("Method not set for %s", route)
|
|
method = (
|
|
route.conditions.get("method", "GET")[0]
|
|
if route.conditions
|
|
else "GET"
|
|
)
|
|
|
|
controller = route.defaults.get("controller")
|
|
action = route.defaults.get("action")
|
|
logging.info(
|
|
"Path: %s; method: %s; operation: %s", path, method, action
|
|
)
|
|
|
|
versioned_methods = {}
|
|
controller_actions = {}
|
|
framework = None
|
|
if hasattr(controller, "controller"):
|
|
# wsgi
|
|
framework = "wsgi"
|
|
contr = controller.controller
|
|
if hasattr(contr, "versioned_methods"):
|
|
versioned_methods = contr.versioned_methods
|
|
if hasattr(contr, "wsgi_actions"):
|
|
controller_actions = contr.wsgi_actions
|
|
if hasattr(controller, "wsgi_actions"):
|
|
# Nova flavors mess with wsgi_action instead of normal operation
|
|
# and actions on the wrong controller
|
|
parent_controller_actions = controller.wsgi_actions
|
|
if parent_controller_actions:
|
|
controller_actions.update(parent_controller_actions)
|
|
elif hasattr(controller, "_pecan") or framework == "pecan":
|
|
# Pecan base app
|
|
framework = "pecan"
|
|
contr = controller
|
|
|
|
else:
|
|
raise RuntimeError(f"Unsupported controller {controller}")
|
|
# logging.debug("Actions: %s, Versioned methods: %s", actions, versioned_methods)
|
|
|
|
# path_spec = openapi_spec.paths.setdefault(path, PathSchema())
|
|
|
|
# operation_spec = dict() #= getattr(path_spec, method.lower()) # , {})
|
|
# Get Path elements
|
|
path_elements: list[str] = list(filter(None, path.split("/")))
|
|
if path_elements and VERSION_RE.match(path_elements[0]):
|
|
path_elements.pop(0)
|
|
operation_tags = self._get_tags_for_url(path)
|
|
|
|
# Build path parameters (/foo/{foo_id}/bar/{id} => $foo_id, $foo_bar_id)
|
|
# Since for same path we are here multiple times check presence of
|
|
# parameter before adding new params
|
|
path_params: list[ParameterSchema] = []
|
|
path_resource_names: list[str] = [
|
|
x.replace("-", "_")
|
|
for x in filter(lambda x: not x.startswith("{"), path_elements)
|
|
]
|
|
for path_element in path_elements:
|
|
if "{" in path_element:
|
|
param_name = path_element.strip("{}")
|
|
global_param_name = (
|
|
"_".join(path_resource_names) + f"_{param_name}"
|
|
)
|
|
|
|
param_ref_name = self._get_param_ref(
|
|
openapi_spec,
|
|
global_param_name,
|
|
param_name,
|
|
param_location="path",
|
|
path=path,
|
|
)
|
|
# Ensure reference to the param is in the path_params
|
|
if param_ref_name not in [k.ref for k in list(path_params)]:
|
|
path_params.append(ParameterSchema(ref=param_ref_name))
|
|
# Cleanup path_resource_names
|
|
# if len(path_resource_names) > 0 and VERSION_RE.match(path_resource_names[0]):
|
|
# # We should not have version prefix in the path_resource_names
|
|
# path_resource_names.pop(0)
|
|
if len(path_resource_names) == 0:
|
|
path_resource_names.append("root")
|
|
elif path_elements[-1].startswith("{"):
|
|
rn = path_resource_names[-1]
|
|
if rn.endswith("ies"):
|
|
rn = rn.replace("ies", "y")
|
|
elif rn.endswith("sses"):
|
|
rn = rn[:-2]
|
|
elif rn.endswith("statuses"):
|
|
rn = rn[:-2]
|
|
else:
|
|
rn = rn.rstrip("s")
|
|
path_resource_names[-1] = rn
|
|
|
|
# Set operationId
|
|
operation_id = re.sub(
|
|
r"^(/?v[0-9.]*/)",
|
|
"",
|
|
"/".join([x.strip("{}") for x in path_elements])
|
|
+ f":{method.lower()}", # noqa
|
|
)
|
|
|
|
if action in versioned_methods:
|
|
# Normal REST operation with version bounds
|
|
(start_version, end_version) = (None, None)
|
|
|
|
# if len(versioned_methods[action]) > 1:
|
|
# for m in versioned_methods[action]:
|
|
# raise RuntimeError("Multiple versioned methods for action %s:%s: %s", path, action, versioned_methods[action])
|
|
for versioned_method in sorted(
|
|
versioned_methods[action], key=lambda v: v.start_version
|
|
):
|
|
start_version = versioned_method.start_version
|
|
end_version = versioned_method.end_version
|
|
func = versioned_method.func
|
|
|
|
# Get the path/op spec only when we have
|
|
# something to fill in
|
|
path_spec = openapi_spec.paths.setdefault(
|
|
path, PathSchema(parameters=path_params)
|
|
)
|
|
operation_spec = getattr(path_spec, method.lower())
|
|
if not operation_spec.operationId:
|
|
operation_spec.operationId = operation_id
|
|
operation_spec.tags.extend(operation_tags)
|
|
operation_spec.tags = list(set(operation_spec.tags))
|
|
|
|
self.process_operation(
|
|
func,
|
|
openapi_spec,
|
|
operation_spec,
|
|
path_resource_names,
|
|
controller=controller,
|
|
method=method,
|
|
operation_name=action,
|
|
start_version=start_version,
|
|
end_version=end_version,
|
|
)
|
|
elif action and hasattr(contr, action):
|
|
# Normal REST operation without version bounds
|
|
func = getattr(contr, action)
|
|
|
|
# Get the path/op spec only when we have
|
|
# something to fill in
|
|
path_spec = openapi_spec.paths.setdefault(
|
|
path, PathSchema(parameters=path_params)
|
|
)
|
|
operation_spec = getattr(path_spec, method.lower())
|
|
if not operation_spec.operationId:
|
|
operation_spec.operationId = operation_id
|
|
operation_spec.tags.extend(operation_tags)
|
|
operation_spec.tags = list(set(operation_spec.tags))
|
|
|
|
self.process_operation(
|
|
func,
|
|
openapi_spec,
|
|
operation_spec,
|
|
path_resource_names,
|
|
controller=controller,
|
|
operation_name=action,
|
|
method=method,
|
|
path=path,
|
|
)
|
|
elif action != "action" and action in controller_actions:
|
|
# Normal REST operation without version bounds and present in
|
|
# wsgi_actions of child or parent controller. Example is
|
|
# compute.flavor.create/update which are exposed as wsgi actions
|
|
# (BUG)
|
|
func = controller_actions[action]
|
|
|
|
# Get the path/op spec only when we have
|
|
# something to fill in
|
|
path_spec = openapi_spec.paths.setdefault(
|
|
path, PathSchema(parameters=path_params)
|
|
)
|
|
operation_spec = getattr(path_spec, method.lower())
|
|
if not operation_spec.operationId:
|
|
operation_spec.operationId = operation_id
|
|
operation_spec.tags.extend(operation_tags)
|
|
operation_spec.tags = list(set(operation_spec.tags))
|
|
|
|
self.process_operation(
|
|
func,
|
|
openapi_spec,
|
|
operation_spec,
|
|
path_resource_names,
|
|
controller=controller,
|
|
operation_name=action,
|
|
method=method,
|
|
path=path,
|
|
)
|
|
|
|
elif (
|
|
controller_actions and action == "action"
|
|
): # and action in controller_actions:
|
|
# There are ACTIONS present on the controller
|
|
for action, op_name in controller_actions.items():
|
|
logging.info("Action %s: %s", action, op_name)
|
|
(start_version, end_version) = (None, None)
|
|
action_impls: list[
|
|
tuple[Callable, str | None, str | None]
|
|
] = []
|
|
if isinstance(op_name, str):
|
|
# wsgi action value is a string
|
|
if op_name in versioned_methods:
|
|
# ACTION with version bounds
|
|
for ver_method in versioned_methods[op_name]:
|
|
action_impls.append(
|
|
(
|
|
ver_method.func,
|
|
ver_method.start_version,
|
|
ver_method.end_version,
|
|
)
|
|
)
|
|
logging.info(
|
|
"Versioned action %s", ver_method.func
|
|
)
|
|
elif hasattr(contr, op_name):
|
|
# ACTION with no version bounds
|
|
func = getattr(contr, op_name)
|
|
action_impls.append((func, None, None))
|
|
logging.info("Unversioned action %s", func)
|
|
else:
|
|
logging.error(
|
|
"Cannot find code for %s:%s:%s [%s]",
|
|
path,
|
|
method,
|
|
action,
|
|
dir(contr),
|
|
)
|
|
continue
|
|
elif callable(op_name):
|
|
# Action is already a function (compute.flavors)
|
|
closurevars = inspect.getclosurevars(op_name)
|
|
# Versioned actions in nova can be themelves as a
|
|
# version_select wrapped callable (i.e. baremetal.action)
|
|
key = closurevars.nonlocals.get("key", None)
|
|
slf = closurevars.nonlocals.get("self", None)
|
|
|
|
if key and key in versioned_methods:
|
|
# ACTION with version bounds
|
|
if len(versioned_methods[key]) > 1:
|
|
logging.warning(
|
|
f"There are multiple callables for action {key} instead of multiple bodies"
|
|
)
|
|
for ver_method in versioned_methods[key]:
|
|
action_impls.append(
|
|
(
|
|
ver_method.func,
|
|
ver_method.start_version,
|
|
ver_method.end_version,
|
|
)
|
|
)
|
|
logging.info(
|
|
"Versioned action %s", ver_method.func
|
|
)
|
|
elif slf and key:
|
|
vm = getattr(slf, "versioned_methods", None)
|
|
if vm and key in vm:
|
|
# ACTION with version bounds
|
|
if len(vm[key]) > 1:
|
|
raise RuntimeError(
|
|
"Multiple versioned methods for action %s",
|
|
action,
|
|
)
|
|
for ver_method in vm[key]:
|
|
action_impls.append(
|
|
(
|
|
ver_method.func,
|
|
ver_method.start_version,
|
|
ver_method.end_version,
|
|
)
|
|
)
|
|
logging.info(
|
|
"Versioned action %s", ver_method.func
|
|
)
|
|
else:
|
|
action_impls.append((op_name, None, None))
|
|
|
|
# Get the path/op spec only when we have
|
|
# something to fill in
|
|
path_spec = openapi_spec.paths.setdefault(
|
|
path, PathSchema(parameters=path_params)
|
|
)
|
|
operation_spec = getattr(path_spec, method.lower())
|
|
if not operation_spec.operationId:
|
|
operation_spec.operationId = operation_id
|
|
operation_spec.tags.extend(operation_tags)
|
|
operation_spec.tags = list(set(operation_spec.tags))
|
|
|
|
for func, start_version, end_version in action_impls:
|
|
self.process_operation(
|
|
func,
|
|
openapi_spec,
|
|
operation_spec,
|
|
path_resource_names,
|
|
controller=controller,
|
|
operation_name=action,
|
|
method=method,
|
|
start_version=start_version,
|
|
end_version=end_version,
|
|
mode="action",
|
|
path=path,
|
|
)
|
|
elif framework == "pecan":
|
|
if callable(controller):
|
|
func = controller
|
|
# Get the path/op spec only when we have
|
|
# something to fill in
|
|
path_spec = openapi_spec.paths.setdefault(
|
|
path, PathSchema(parameters=path_params)
|
|
)
|
|
operation_spec = getattr(path_spec, method.lower())
|
|
if not operation_spec.operationId:
|
|
operation_spec.operationId = operation_id
|
|
operation_spec.tags.extend(operation_tags)
|
|
operation_spec.tags = list(set(operation_spec.tags))
|
|
|
|
self.process_operation(
|
|
func,
|
|
openapi_spec,
|
|
operation_spec,
|
|
path_resource_names,
|
|
controller=controller,
|
|
operation_name=action,
|
|
method=method,
|
|
path=path,
|
|
)
|
|
|
|
else:
|
|
logging.warning(controller.__dict__.items())
|
|
logging.warning(contr.__dict__.items())
|
|
logging.warning("No operation found")
|
|
|
|
return operation_spec
|
|
|
|
def process_operation(
|
|
self,
|
|
func,
|
|
openapi_spec,
|
|
operation_spec,
|
|
path_resource_names,
|
|
*,
|
|
controller=None,
|
|
operation_name=None,
|
|
method=None,
|
|
start_version=None,
|
|
end_version=None,
|
|
mode=None,
|
|
path: str | None = None,
|
|
):
|
|
logging.info(
|
|
"%s: %s [%s]", (mode or "operation").title(), operation_name, func
|
|
)
|
|
# New decorators start having explicit null ApiVersion instead of being null
|
|
if (
|
|
start_version
|
|
and not isinstance(start_version, str)
|
|
and self._api_ver_major(start_version) in [0, None]
|
|
and self._api_ver_minor(start_version) in [0, None]
|
|
):
|
|
start_version = None
|
|
if (
|
|
end_version
|
|
and not isinstance(end_version, str)
|
|
and self._api_ver_major(end_version) in [0, None]
|
|
and self._api_ver_minor(end_version) in [0, None]
|
|
):
|
|
end_version = None
|
|
|
|
deser_schema = None
|
|
deser = getattr(controller, "deserializer", None)
|
|
if deser:
|
|
deser_schema = getattr(deser, "schema", None)
|
|
ser = getattr(controller, "serializer", None)
|
|
# deser_schema = getattr(deser, "schema", None)
|
|
ser_schema = getattr(ser, "schema", None)
|
|
if not ser_schema and hasattr(ser, "task_schema"):
|
|
# Image Task serializer is a bit different
|
|
ser_schema = getattr(ser, "task_schema")
|
|
|
|
if mode != "action":
|
|
doc = inspect.getdoc(func)
|
|
if doc and not operation_spec.description:
|
|
doc = rst_to_md(doc)
|
|
operation_spec.description = LiteralScalarString(doc)
|
|
if operation_spec.description:
|
|
# Reading spec from yaml file it was converted back to regular
|
|
# string. Therefore need to force it back to Literal block.
|
|
operation_spec.description = LiteralScalarString(
|
|
operation_spec.description
|
|
)
|
|
|
|
action_name = None
|
|
query_params_versions = []
|
|
body_schemas: list[str | None] | Unset = UNSET
|
|
expected_errors = ["404"]
|
|
response_code = None
|
|
# Version bound on an operation are set only when it is not an
|
|
# "action"
|
|
if (
|
|
mode != "action"
|
|
and start_version
|
|
and self._api_ver_major(start_version) != 0
|
|
):
|
|
if not (
|
|
"min-ver" in operation_spec.openstack
|
|
and tuple(
|
|
[
|
|
int(x)
|
|
for x in operation_spec.openstack["min-ver"].split(".")
|
|
]
|
|
)
|
|
< (self._api_ver(start_version))
|
|
):
|
|
operation_spec.openstack["min-ver"] = (
|
|
start_version.get_string()
|
|
if hasattr(start_version, "get_string")
|
|
else str(start_version)
|
|
)
|
|
|
|
if (
|
|
mode != "action"
|
|
and end_version
|
|
and self._api_ver_major(end_version)
|
|
):
|
|
if self._api_ver_major(end_version) == 0:
|
|
operation_spec.openstack.pop("max-ver", None)
|
|
operation_spec.deprecated = None
|
|
else:
|
|
# There is some end_version. Set the deprecated flag and wait
|
|
# for final version to be processed which drop it if max_ver
|
|
# is not set
|
|
operation_spec.deprecated = True
|
|
if not (
|
|
"max-ver" in operation_spec.openstack
|
|
and tuple(
|
|
[
|
|
int(x)
|
|
for x in operation_spec.openstack["max-ver"].split(
|
|
"."
|
|
)
|
|
]
|
|
)
|
|
> self._api_ver(end_version)
|
|
):
|
|
operation_spec.openstack["max-ver"] = (
|
|
end_version.get_string()
|
|
if hasattr(end_version, "get_string")
|
|
else str(end_version)
|
|
)
|
|
|
|
action_name = getattr(func, "wsgi_action", None)
|
|
if action_name:
|
|
operation_name = action_name
|
|
|
|
(
|
|
query_params_versions,
|
|
body_schemas,
|
|
response_body_schema,
|
|
expected_errors,
|
|
) = self._process_decorators(
|
|
func,
|
|
path_resource_names,
|
|
openapi_spec,
|
|
mode,
|
|
start_version,
|
|
end_version,
|
|
action_name,
|
|
operation_name,
|
|
)
|
|
|
|
if hasattr(func, "_wsme_definition"):
|
|
fdef = getattr(func, "_wsme_definition")
|
|
body_spec = getattr(fdef, "body_type", None)
|
|
if body_spec:
|
|
body_schema = _convert_wsme_to_jsonschema(body_spec)
|
|
schema_name = body_spec.__name__
|
|
openapi_spec.components.schemas.setdefault(
|
|
schema_name, TypeSchema(**body_schema)
|
|
)
|
|
if body_schemas is UNSET:
|
|
body_schemas = []
|
|
if isinstance(body_schemas, list):
|
|
body_schemas.append(f"#/components/schemas/{schema_name}")
|
|
rsp_spec = getattr(fdef, "return_type", None)
|
|
if rsp_spec:
|
|
ser_schema = _convert_wsme_to_jsonschema(rsp_spec)
|
|
response_code = getattr(fdef, "status_code", None)
|
|
|
|
if body_schemas is UNSET and deser_schema:
|
|
# Glance may have request deserializer attached schema
|
|
schema_name = (
|
|
"".join([x.title() for x in path_resource_names])
|
|
+ func.__name__.title()
|
|
+ "Request"
|
|
)
|
|
(body_schema, mime_type) = self._get_schema_ref(
|
|
openapi_spec,
|
|
schema_name,
|
|
description=f"Request of the {operation_spec.operationId} operation",
|
|
schema_def=deser_schema,
|
|
)
|
|
|
|
if query_params_versions:
|
|
so = sorted(
|
|
query_params_versions,
|
|
key=lambda d: (
|
|
tuple(map(int, d[1].split("."))) if d[1] else (0, 0)
|
|
),
|
|
)
|
|
for data, min_ver, max_ver in so:
|
|
self.process_query_parameters(
|
|
openapi_spec,
|
|
operation_spec,
|
|
path_resource_names,
|
|
data,
|
|
min_ver,
|
|
max_ver,
|
|
)
|
|
# if body_schemas or mode == "action":
|
|
if method in ["PUT", "POST", "PATCH"]:
|
|
self.process_body_parameters(
|
|
openapi_spec,
|
|
operation_spec,
|
|
path_resource_names,
|
|
body_schemas,
|
|
mode,
|
|
operation_name,
|
|
)
|
|
|
|
if ser_schema and not response_body_schema:
|
|
response_body_schema = ser_schema
|
|
responses_spec = operation_spec.responses
|
|
for error in expected_errors:
|
|
responses_spec.setdefault(str(error), {"description": "Error"})
|
|
|
|
if mode != "action" and str(error) == "410":
|
|
# This looks like a deprecated operation still hanging out there
|
|
operation_spec.deprecated = True
|
|
if not response_code:
|
|
response_codes = getattr(func, "wsgi_code", None)
|
|
if response_codes:
|
|
if not isinstance(response_codes, list):
|
|
response_codes = [response_codes]
|
|
# py starting from 3.11 magically works for str(enum.IntEnum),
|
|
# while older ones need an explicit conversion
|
|
response_codes = [
|
|
rc.value if isinstance(rc, enum.IntEnum) else rc
|
|
for rc in response_codes
|
|
]
|
|
else:
|
|
response_codes = [response_code]
|
|
if not response_codes:
|
|
# No expected response code known, take "normal" defaults
|
|
response_codes = self._get_response_codes(
|
|
method, operation_spec.operationId
|
|
)
|
|
if response_codes:
|
|
for response_code in response_codes:
|
|
rsp = responses_spec.setdefault(
|
|
str(response_code), {"description": "Ok"}
|
|
)
|
|
if str(response_code) != "204" and method != "DELETE":
|
|
# Arrange response placeholder
|
|
schema_name = (
|
|
"".join([x.title() for x in path_resource_names])
|
|
+ (
|
|
operation_name.replace("index", "list").title()
|
|
if not path_resource_names[-1].endswith(
|
|
operation_name
|
|
)
|
|
else ""
|
|
)
|
|
+ "Response"
|
|
)
|
|
(schema_ref, mime_type) = self._get_schema_ref(
|
|
openapi_spec,
|
|
schema_name,
|
|
description=(
|
|
f"Response of the {operation_spec.operationId} operation"
|
|
if not action_name
|
|
else f"Response of the {operation_spec.operationId}:{action_name} action"
|
|
), # noqa
|
|
schema_def=response_body_schema,
|
|
action_name=action_name,
|
|
)
|
|
|
|
if schema_ref:
|
|
curr_schema = (
|
|
rsp.get("content", {})
|
|
.get("application/json", {})
|
|
.get("schema", {})
|
|
)
|
|
if mode == "action" and curr_schema:
|
|
# There is existing response for the action. Need to
|
|
# merge them
|
|
if isinstance(curr_schema, dict):
|
|
curr_oneOf = curr_schema.get("oneOf")
|
|
curr_ref = curr_schema.get("$ref")
|
|
else:
|
|
curr_oneOf = curr_schema.oneOf
|
|
curr_ref = curr_schema.ref
|
|
if curr_oneOf:
|
|
if schema_ref not in [
|
|
x["$ref"] for x in curr_oneOf
|
|
]:
|
|
curr_oneOf.append({"$ref": schema_ref})
|
|
elif curr_ref and curr_ref != schema_ref:
|
|
rsp["content"]["application/json"][
|
|
"schema"
|
|
] = TypeSchema(
|
|
oneOf=[
|
|
{"$ref": curr_ref},
|
|
{"$ref": schema_ref},
|
|
]
|
|
)
|
|
else:
|
|
rsp["content"] = {
|
|
"application/json": {
|
|
"schema": {"$ref": schema_ref}
|
|
}
|
|
}
|
|
|
|
# Ensure operation tags are existing
|
|
for tag in operation_spec.tags:
|
|
if tag not in [x["name"] for x in openapi_spec.tags]:
|
|
openapi_spec.tags.append({"name": tag})
|
|
|
|
self._post_process_operation_hook(
|
|
openapi_spec, operation_spec, path=path
|
|
)
|
|
|
|
def _post_process_operation_hook(
|
|
self, openapi_spec, operation_spec, path: str | None = None
|
|
):
|
|
"""Hook to allow service specific generator to modify details"""
|
|
pass
|
|
|
|
def process_query_parameters(
|
|
self,
|
|
openapi_spec,
|
|
operation_spec,
|
|
path_resource_names,
|
|
obj,
|
|
min_ver,
|
|
max_ver,
|
|
):
|
|
"""Process query parameters in different versions
|
|
|
|
It is expected, that this method is invoked in the raising min_ver order to do proper cleanup of max_ver
|
|
"""
|
|
if "type" not in obj:
|
|
# Nova has empty definitions for deprecated methods
|
|
return
|
|
# Yey - we have query_parameters
|
|
if obj["type"] == "object":
|
|
params = obj["properties"]
|
|
for prop, spec in params.items():
|
|
param_name = "_".join(path_resource_names) + f"_{prop}"
|
|
|
|
param_attrs: dict[str, TypeSchema | dict] = {}
|
|
if spec == {}:
|
|
# Nova added empty params since it was never validating them. Skip
|
|
param_attrs["schema"] = TypeSchema(type="string")
|
|
elif spec["type"] == "array":
|
|
param_attrs["schema"] = TypeSchema(
|
|
**copy.deepcopy(spec["items"])
|
|
)
|
|
else:
|
|
param_attrs["schema"] = TypeSchema(**copy.deepcopy(spec))
|
|
param_attrs["description"] = spec.get("description")
|
|
if min_ver:
|
|
os_ext = param_attrs.setdefault("x-openstack", {})
|
|
os_ext["min-ver"] = min_ver
|
|
if max_ver:
|
|
os_ext = param_attrs.setdefault("x-openstack", {})
|
|
os_ext["max-ver"] = max_ver
|
|
ref_name = self._get_param_ref(
|
|
openapi_spec,
|
|
param_name,
|
|
prop,
|
|
param_location="query",
|
|
path=None,
|
|
**param_attrs,
|
|
)
|
|
if ref_name not in [x.ref for x in operation_spec.parameters]:
|
|
operation_spec.parameters.append(
|
|
ParameterSchema(ref=ref_name)
|
|
)
|
|
|
|
else:
|
|
raise RuntimeError(
|
|
f"Query parameters {obj} is not an object as expected"
|
|
)
|
|
|
|
def process_body_parameters(
|
|
self,
|
|
openapi_spec,
|
|
operation_spec,
|
|
path_resource_names,
|
|
body_schemas,
|
|
mode,
|
|
action_name,
|
|
):
|
|
# Body is not expected, exit (unless we are in the "action")
|
|
if body_schemas is None or (body_schemas == [] and mode != "action"):
|
|
return
|
|
mime_type: str | None = "application/json"
|
|
schema_name = None
|
|
schema_ref: str | Unset | None = None
|
|
# We should not modify path_resource_names of the caller
|
|
path_resource_names = path_resource_names.copy()
|
|
# Create container schema with version discriminator
|
|
if action_name:
|
|
path_resource_names.append(action_name)
|
|
|
|
cont_schema_name = (
|
|
"".join([x.title() for x in path_resource_names]) + "Request"
|
|
)
|
|
cont_schema = None
|
|
|
|
if body_schemas is not UNSET and len(body_schemas) == 1:
|
|
# There is only one body known at the moment
|
|
# None is a special case with explicitly no body supported
|
|
if True: # body_schemas[0] is not UNSET:
|
|
if cont_schema_name in openapi_spec.components.schemas:
|
|
# if we have already oneOf - add there
|
|
cont_schema = openapi_spec.components.schemas[
|
|
cont_schema_name
|
|
]
|
|
if cont_schema.oneOf and body_schemas[0] not in [
|
|
x["$ref"] for x in cont_schema.oneOf
|
|
]:
|
|
cont_schema.oneOf.append({"$ref": body_schemas[0]})
|
|
schema_ref = f"#/components/schemas/{cont_schema_name}"
|
|
else:
|
|
# otherwise just use schema as body
|
|
schema_ref = body_schemas[0]
|
|
elif body_schemas is not UNSET and len(body_schemas) > 1:
|
|
# We may end up here multiple times if we have versioned operation. In this case merge to what we have already
|
|
op_body = operation_spec.requestBody.setdefault("content", {})
|
|
old_schema = op_body.get(mime_type, {}).get("schema", {})
|
|
old_ref = (
|
|
old_schema.ref
|
|
if isinstance(old_schema, TypeSchema)
|
|
else old_schema.get("$ref")
|
|
)
|
|
cont_schema = openapi_spec.components.schemas.setdefault(
|
|
cont_schema_name,
|
|
TypeSchema(
|
|
oneOf=[], openstack={"discriminator": "microversion"}
|
|
),
|
|
)
|
|
# Add new refs to the container oneOf if they are not already
|
|
# there
|
|
cont_schema.oneOf.extend(
|
|
[
|
|
{"$ref": n}
|
|
for n in body_schemas
|
|
if n not in [x.get("$ref") for x in cont_schema.oneOf]
|
|
]
|
|
)
|
|
schema_ref = f"#/components/schemas/{cont_schema_name}"
|
|
if (
|
|
old_ref
|
|
and old_ref != schema_ref
|
|
and old_ref not in [x["$ref"] for x in cont_schema.oneOf]
|
|
):
|
|
# In a previous iteration we only had one schema and decided
|
|
# not to create container. Now we need to change that by
|
|
# merging with previous data
|
|
cont_schema.oneOf.append({"$ref": old_ref})
|
|
elif mode == "action":
|
|
# There are actions without a real body description, but we know that action requires dummy body
|
|
cont_schema = openapi_spec.components.schemas.setdefault(
|
|
cont_schema_name,
|
|
TypeSchema(
|
|
description=f"Empty body for {action_name} action",
|
|
type="object",
|
|
properties={action_name: {"type": "null"}},
|
|
openstack={"action-name": action_name},
|
|
),
|
|
)
|
|
schema_ref = f"#/components/schemas/{cont_schema_name}"
|
|
elif body_schemas is UNSET:
|
|
# We know nothing about request
|
|
schema_name = (
|
|
"".join([x.title() for x in path_resource_names])
|
|
+ (
|
|
action_name.replace("index", "list").title()
|
|
if not path_resource_names[-1].endswith(action_name)
|
|
else ""
|
|
)
|
|
+ "Request"
|
|
)
|
|
(schema_ref, mime_type) = self._get_schema_ref(
|
|
openapi_spec,
|
|
schema_name,
|
|
description=f"Request of the {operation_spec.operationId} operation",
|
|
action_name=action_name,
|
|
schema_def=UNSET,
|
|
)
|
|
|
|
if mode == "action":
|
|
op_body = operation_spec.requestBody.setdefault("content", {})
|
|
js_content = op_body.setdefault(mime_type, {})
|
|
body_schema = js_content.setdefault("schema", {})
|
|
one_of = body_schema.setdefault("oneOf", [])
|
|
if schema_ref and schema_ref not in [
|
|
x.get("$ref") for x in one_of
|
|
]:
|
|
one_of.append({"$ref": schema_ref})
|
|
os_ext = body_schema.setdefault("x-openstack", {})
|
|
os_ext["discriminator"] = "action"
|
|
if cont_schema and action_name:
|
|
cont_schema.openstack["action-name"] = action_name
|
|
elif schema_ref is not None:
|
|
op_body = operation_spec.requestBody.setdefault("content", {})
|
|
js_content = op_body.setdefault(mime_type, {})
|
|
body_schema = js_content.setdefault("schema", {})
|
|
operation_spec.requestBody["content"][mime_type]["schema"] = (
|
|
TypeSchema(ref=schema_ref)
|
|
)
|
|
|
|
def _sanitize_schema(
|
|
self, schema, *, start_version=None, end_version=None
|
|
):
|
|
"""Various schemas are broken in various ways"""
|
|
|
|
if isinstance(schema, dict):
|
|
# Forcibly convert to TypeSchema
|
|
schema = TypeSchema(**schema)
|
|
properties = getattr(schema, "properties", None)
|
|
if properties:
|
|
# Nova aggregates schemas are broken since they have "type": "object" inside "properties
|
|
if properties.get("type") == "object":
|
|
schema.properties.pop("type")
|
|
|
|
if "anyOf" in properties:
|
|
# anyOf must be on the properties level and not under (nova host update)
|
|
anyOf = schema.properties.pop("anyOf")
|
|
schema.anyOf = anyOf
|
|
|
|
for k, v in properties.items():
|
|
typ = v.get("type")
|
|
if typ == "object":
|
|
schema.properties[k] = self._sanitize_schema(v)
|
|
if typ == "array" and "additionalItems" in v:
|
|
# additionalItems have nothing to do under the type array (create servergroup)
|
|
schema.properties[k].pop("additionalItems")
|
|
if (
|
|
typ == "array"
|
|
and "items" in v
|
|
and isinstance(v["items"], list)
|
|
):
|
|
# server_group create - type array "items" is a dict and not list
|
|
# NOTE: server_groups recently changed to "prefixItems",
|
|
# so this may be not necessary anymore
|
|
schema.properties[k]["items"] = v["items"][0]
|
|
if start_version and self._api_ver_major(start_version) not in [
|
|
"0",
|
|
0,
|
|
]:
|
|
if not schema.openstack:
|
|
schema.openstack = {}
|
|
schema.openstack["min-ver"] = start_version.get_string()
|
|
if end_version and self._api_ver_major(end_version) not in ["0", 0]:
|
|
if not schema.openstack:
|
|
schema.openstack = {}
|
|
schema.openstack["max-ver"] = end_version.get_string()
|
|
return schema
|
|
|
|
def _get_param_ref(
|
|
self,
|
|
openapi_spec,
|
|
ref_name: str,
|
|
param_name: str,
|
|
param_location: str,
|
|
path: str | None = None,
|
|
**param_attrs,
|
|
) -> str:
|
|
if ref_name == "_project_id":
|
|
ref_name = "project_id"
|
|
ref_name = ref_name.replace(":", "_")
|
|
# Pop extensions for easier post processing
|
|
if param_attrs:
|
|
os_ext = param_attrs.pop("x-openstack", {})
|
|
else:
|
|
os_ext = {}
|
|
# Ensure global parameter is present
|
|
param = ParameterSchema(
|
|
location=param_location, name=param_name, **param_attrs
|
|
)
|
|
if param_location == "path":
|
|
param.required = True
|
|
if not param.description and path:
|
|
param.description = f"{param_name} parameter for {path} API"
|
|
# We can only assume the param type. For path it is logically a string only
|
|
if not param.type_schema:
|
|
param.type_schema = TypeSchema(type="string")
|
|
if os_ext and ("min-ver" in os_ext or "max-ver" in os_ext):
|
|
# min_ver is present
|
|
old_param = openapi_spec.components.parameters.get(ref_name, None)
|
|
if not old_param:
|
|
# Param was not present, just set what we have
|
|
param.openstack = os_ext
|
|
else:
|
|
# Param is already present. Check whether we need to modify min_ver
|
|
min_ver = os_ext.get("min-ver")
|
|
max_ver = os_ext.get("max-ver")
|
|
param.openstack = {}
|
|
if not old_param.openstack:
|
|
old_param.openstack = {}
|
|
old_min_ver = old_param.openstack.get("min-ver")
|
|
old_max_ver = old_param.openstack.get("max-ver")
|
|
if old_min_ver and tuple(old_min_ver.split(".")) < tuple(
|
|
min_ver.split(".")
|
|
):
|
|
# Existing param has lower min_ver. Keep the old value
|
|
os_ext["min-ver"] = old_min_ver
|
|
if (
|
|
old_max_ver
|
|
and max_ver
|
|
and tuple(old_max_ver.split("."))
|
|
> tuple(max_ver.split("."))
|
|
):
|
|
# Existing param has max_ver higher then what we have now. Keep old value
|
|
os_ext["max_ver"] = old_max_ver
|
|
if param_name == "user_id":
|
|
os_ext["resource_link"] = "identity/v3/user.id"
|
|
if param_name == "domain_id":
|
|
os_ext["resource_link"] = "identity/v3/domain.id"
|
|
if param_name == "project_id":
|
|
os_ext["resource_link"] = "identity/v3/project.id"
|
|
if os_ext != {}:
|
|
param.openstack = os_ext
|
|
|
|
# Overwrite param
|
|
openapi_spec.components.parameters[ref_name] = param
|
|
return f"#/components/parameters/{ref_name}"
|
|
|
|
def _get_schema_ref(
|
|
self,
|
|
openapi_spec,
|
|
name,
|
|
description: str | None = None,
|
|
schema_def=UNSET,
|
|
action_name=None,
|
|
) -> tuple[str | None, str | None]:
|
|
if schema_def is UNSET:
|
|
logging.warning(
|
|
"No Schema definition for %s[%s] is known", name, action_name
|
|
)
|
|
# Create dummy schema since we got no data for it
|
|
schema_def = {
|
|
"type": "object",
|
|
"description": LiteralScalarString(description),
|
|
}
|
|
if schema_def is not None:
|
|
schema = openapi_spec.components.schemas.setdefault(
|
|
name, TypeSchema(**schema_def)
|
|
)
|
|
|
|
if action_name:
|
|
if not schema.openstack:
|
|
schema.openstack = {}
|
|
schema.openstack.setdefault("action-name", action_name)
|
|
|
|
return (f"#/components/schemas/{name}", "application/json")
|
|
else:
|
|
return (None, None)
|
|
|
|
def _get_tags_for_url(self, url):
|
|
"""Return Tag (group) name based on the URL"""
|
|
# Drop version prefix
|
|
url = re.sub(r"^(/v[0-9\.]*/)", "/", url)
|
|
|
|
for k, v in self.URL_TAG_MAP.items():
|
|
if url.startswith(k):
|
|
return [v]
|
|
if url == "/":
|
|
return ["version"]
|
|
path_elements: list[str] = list(filter(None, url.split("/")))
|
|
for el in path_elements:
|
|
# Use 1st (non project_id) path element as tag
|
|
if not el.startswith("{"):
|
|
return [el]
|
|
|
|
@classmethod
|
|
def _get_response_codes(cls, method: str, operationId: str) -> list[str]:
|
|
if method == "DELETE":
|
|
response_code = "204"
|
|
elif method == "POST":
|
|
response_code = "201"
|
|
else:
|
|
response_code = "200"
|
|
return [response_code]
|
|
|
|
def _process_decorators(
|
|
self,
|
|
func,
|
|
path_resource_names: list[str],
|
|
openapi_spec,
|
|
mode: str,
|
|
start_version,
|
|
end_version,
|
|
action_name: str | None = None,
|
|
operation_name: str | None = None,
|
|
):
|
|
"""Extract schemas from the decorated method."""
|
|
# Unwrap operation decorators to access all properties
|
|
expected_errors: list[str] = []
|
|
body_schemas: list[str | None] | Unset = UNSET
|
|
query_params_versions: list[tuple] = []
|
|
response_body_schema: dict | Unset | None = UNSET
|
|
|
|
f = func
|
|
while hasattr(f, "__wrapped__"):
|
|
closure = inspect.getclosurevars(f)
|
|
closure_locals = closure.nonlocals
|
|
min_ver = closure_locals.get("min_version", start_version)
|
|
if min_ver and not isinstance(min_ver, str):
|
|
min_ver = (
|
|
min_ver.get_string()
|
|
if hasattr(min_ver, "get_string")
|
|
else str(min_ver)
|
|
)
|
|
max_ver = closure_locals.get("max_version", end_version)
|
|
if max_ver and not isinstance(max_ver, str):
|
|
max_ver = (
|
|
max_ver.get_string()
|
|
if hasattr(max_ver, "get_string")
|
|
else str(max_ver)
|
|
)
|
|
|
|
if "errors" in closure_locals:
|
|
expected_errors = closure_locals["errors"]
|
|
if isinstance(expected_errors, list):
|
|
expected_errors = [
|
|
str(x)
|
|
for x in filter(
|
|
lambda x: isinstance(x, int), expected_errors
|
|
)
|
|
]
|
|
elif isinstance(expected_errors, int):
|
|
expected_errors = [str(expected_errors)]
|
|
if "request_body_schema" in closure_locals or hasattr(
|
|
f, "_request_body_schema"
|
|
):
|
|
# Body type is known through method decorator
|
|
obj = closure_locals.get(
|
|
"request_body_schema",
|
|
getattr(f, "_request_body_schema", {}),
|
|
)
|
|
# body schemas are not UNSET anymore
|
|
if body_schemas is UNSET:
|
|
body_schemas = []
|
|
if obj is not None:
|
|
if obj.get("type") in ["object", "array"]:
|
|
# We only allow object and array bodies
|
|
# To prevent type name collision keep module name part of the name
|
|
typ_name = (
|
|
"".join([x.title() for x in path_resource_names])
|
|
+ func.__name__.title()
|
|
+ (
|
|
f"_{min_ver.replace('.', '')}"
|
|
if min_ver
|
|
else ""
|
|
)
|
|
)
|
|
comp_schema = (
|
|
openapi_spec.components.schemas.setdefault(
|
|
typ_name,
|
|
self._sanitize_schema(
|
|
copy.deepcopy(obj),
|
|
start_version=start_version,
|
|
end_version=end_version,
|
|
),
|
|
)
|
|
)
|
|
|
|
if min_ver:
|
|
if not comp_schema.openstack:
|
|
comp_schema.openstack = {}
|
|
comp_schema.openstack["min-ver"] = min_ver
|
|
if max_ver:
|
|
if not comp_schema.openstack:
|
|
comp_schema.openstack = {}
|
|
comp_schema.openstack["max-ver"] = max_ver
|
|
if mode == "action":
|
|
if not comp_schema.openstack:
|
|
comp_schema.openstack = {}
|
|
comp_schema.openstack["action-name"] = action_name
|
|
|
|
ref_name = f"#/components/schemas/{typ_name}"
|
|
if isinstance(body_schemas, list):
|
|
body_schemas.append(ref_name)
|
|
else:
|
|
# register no-body
|
|
if isinstance(body_schemas, list):
|
|
body_schemas.append(None)
|
|
|
|
if "response_body_schema" in closure_locals or hasattr(
|
|
f, "_response_body_schema"
|
|
):
|
|
# Response type is known through method decorator - PERFECT
|
|
obj = closure_locals.get(
|
|
"response_body_schema",
|
|
getattr(f, "_response_body_schema", {}),
|
|
)
|
|
response_body_schema = obj
|
|
if "query_params_schema" in closure_locals or hasattr(
|
|
f, "_request_query_schema"
|
|
):
|
|
obj = closure_locals.get(
|
|
"query_params_schema",
|
|
getattr(f, "_request_query_schema", {}),
|
|
)
|
|
query_params_versions.append((obj, min_ver, max_ver))
|
|
if "validators" in closure_locals:
|
|
validators = closure_locals.get("validators")
|
|
body_schemas = []
|
|
if isinstance(validators, dict):
|
|
for k, v in validators.items():
|
|
sig = inspect.signature(v)
|
|
vals = sig.parameters.get("validators", None)
|
|
if vals:
|
|
sig2 = inspect.signature(vals.default[0])
|
|
schema_param = sig2.parameters.get("schema", None)
|
|
if schema_param:
|
|
schema = schema_param.default
|
|
|
|
typ_name = (
|
|
"".join(
|
|
[
|
|
x.title()
|
|
for x in path_resource_names
|
|
]
|
|
)
|
|
+ func.__name__.title()
|
|
+ (
|
|
f"_{min_ver.replace('.', '')}"
|
|
if min_ver
|
|
else ""
|
|
)
|
|
)
|
|
comp_schema = (
|
|
openapi_spec.components.schemas.setdefault(
|
|
typ_name,
|
|
self._sanitize_schema(
|
|
copy.deepcopy(schema),
|
|
start_version=None,
|
|
end_version=None,
|
|
),
|
|
)
|
|
)
|
|
|
|
ref_name = f"#/components/schemas/{typ_name}"
|
|
if isinstance(body_schemas, list):
|
|
body_schemas.append(ref_name)
|
|
|
|
f = f.__wrapped__
|
|
|
|
return (
|
|
query_params_versions,
|
|
body_schemas,
|
|
response_body_schema,
|
|
expected_errors,
|
|
)
|
|
|
|
|
|
def _convert_wsme_to_jsonschema(body_spec):
|
|
"""Convert WSME type description to JsonSchema"""
|
|
res: dict[str, Any] = {}
|
|
if wtypes.iscomplex(body_spec) or isinstance(body_spec, wtypes.wsattr):
|
|
res = {"type": "object", "properties": {}}
|
|
doc = inspect.getdoc(body_spec)
|
|
if doc:
|
|
res.setdefault("description", LiteralScalarString(doc))
|
|
required = set()
|
|
for attr in wtypes.list_attributes(body_spec):
|
|
attr_value = getattr(body_spec, attr.key)
|
|
if isinstance(attr_value, wtypes.wsproperty):
|
|
r = _convert_wsme_to_jsonschema(attr_value)
|
|
else:
|
|
r = _convert_wsme_to_jsonschema(attr_value._get_datatype())
|
|
res["properties"][attr.key] = r
|
|
if attr.mandatory:
|
|
required.add(attr.name)
|
|
# todo: required
|
|
if required:
|
|
res.setdefault("required", list(required))
|
|
elif isinstance(body_spec, wtypes.ArrayType):
|
|
res = {
|
|
"type": "array",
|
|
"items": _convert_wsme_to_jsonschema(body_spec.item_type),
|
|
}
|
|
elif isinstance(body_spec, wtypes.StringType) or body_spec is str:
|
|
res = {"type": "string"}
|
|
min_len = getattr(body_spec, "min_length", None)
|
|
max_len = getattr(body_spec, "max_length", None)
|
|
if min_len:
|
|
res["minLength"] = min_len
|
|
if max_len:
|
|
res["maxLength"] = max_len
|
|
elif isinstance(body_spec, wtypes.IntegerType):
|
|
res = {"type": "integer"}
|
|
minimum = getattr(body_spec, "minimum", None)
|
|
maximum = getattr(body_spec, "maximum", None)
|
|
if minimum:
|
|
res["minimum"] = minimum
|
|
if maximum:
|
|
res["maximum"] = maximum
|
|
elif isinstance(body_spec, wtypes.Enum):
|
|
basetype = body_spec.basetype
|
|
values = body_spec.values
|
|
if basetype is str:
|
|
res = {"type": "string"}
|
|
elif basetype is float:
|
|
res = {"type": "number"}
|
|
elif basetype is int:
|
|
res = {"type": "integer"}
|
|
else:
|
|
raise RuntimeError(f"Unsupported basetype {basetype}")
|
|
res["enum"] = list(values)
|
|
# elif hasattr(body_spec, "__name__") and body_spec.__name__ == "bool":
|
|
elif wtypes.isdict(body_spec):
|
|
res = {
|
|
"type": "object",
|
|
"additionalProperties": _convert_wsme_to_jsonschema(
|
|
body_spec.value_type
|
|
),
|
|
}
|
|
elif wtypes.isusertype(body_spec):
|
|
basetype = body_spec.basetype
|
|
name = body_spec.name
|
|
if basetype is str:
|
|
res = {"type": "string", "format": name}
|
|
else:
|
|
raise RuntimeError(f"Unsupported basetype {basetype}")
|
|
elif isinstance(body_spec, wtypes.wsproperty):
|
|
res = _convert_wsme_to_jsonschema(body_spec.datatype)
|
|
elif body_spec is bool:
|
|
# wsattr(bool) lands here as <class 'bool'>
|
|
res = {"type": "boolean"}
|
|
elif body_spec is float:
|
|
res = {"type": "number", "format": "float"}
|
|
elif (
|
|
isinstance(body_spec, wtypes.dt_types)
|
|
or body_spec is datetime.datetime
|
|
):
|
|
res = {"type": "string", "format": "date-time"}
|
|
else:
|
|
raise RuntimeError(f"Unsupported object {body_spec}")
|
|
|
|
return res
|