Files
codegenerator/codegenerator/openapi/barbican.py
Gorka Eguileor 87ff2fe25e YAML version support
When exporting OpenAPI in YAML format we are exporting in YAML version
1.2, which is the latest. The problem with that is that there are tools
that use the PyYAML [1] python package to read these files, and that
package only supports YAML v1.1, which will lead to reading things
incorrectly.

An example is with booleans.

In v1.1 a lot of values are considered as booleans (case insensitive):
true, false, 1, 0, off, on, no, yes... But in v1.2 only true and false
are considered booleans, so the others don't need to be quoted.

As an example we were generating something like this:

```
        os_hypervisors_with_servers:
          in: query
          name: with_servers
          schema:
            type:
              - boolean
              - string
            enum:
              - true
              - 'True'
              - 'TRUE'
              - 'true'
              - '1'
              - ON
              - On
              - on
              - YES
              - Yes
              - yes
              - false
              - 'False'
              - 'FALSE'
              - 'false'
              - '0'
              - OFF
              - Off
              - off
              - NO
              - No
              - no
          x-openstack:
            min-ver: '2.53'
```

Which is incorrectly interpreted by PyYAML like this:

```
            enum:
              - true
              - 'True'
              - 'TRUE'
              - 'true'
              - '1'
              - true
              - true
              - true
              - true
              - true
              - true
              - false
              - 'False'
              - 'FALSE'
              - 'false'
              - '0'
              - false
              - false
              - false
              - false
              - false
              - false ```
```

To fix this we enable our tool to output the specs in v1.1 with a new
parameter `--yaml-version` so that when it's set to 1.1 it will quote
all booleans that in v1.1 could be misinterpreted.

[1]: https://pypi.org/project/PyYAML/

Change-Id: I7236f6a94ccb2e92a086c16895efa4dc557460c4
2025-06-05 18:52:44 +02:00

545 lines
18 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import inspect
from multiprocessing import Process
from pathlib import Path
from unittest import mock
import fixtures
from codegenerator.common.schema import SpecSchema
from codegenerator.openapi.base import OpenStackServerSourceBase
from codegenerator.openapi.utils import merge_api_ref_doc
from codegenerator.openapi.barbican_schemas import container
from codegenerator.openapi.barbican_schemas import order
from codegenerator.openapi.barbican_schemas import quota
from codegenerator.openapi.barbican_schemas import secret
from codegenerator.openapi.barbican_schemas import secret_store
from ruamel.yaml.scalarstring import LiteralScalarString
class BarbicanGenerator(OpenStackServerSourceBase):
URL_TAG_MAP = {}
RESOURCE_MODULES = [container, order, quota, secret, secret_store]
def __init__(self):
pass
def _api_ver_major(self, ver):
return ver.ver_major
def _api_ver_minor(self, ver):
return ver.ver_minor
def _api_ver(self, ver):
return (ver.ver_major, ver.ver_minor)
def _build_routes(
self, mapper, node, path="", param_url_map: dict[str, str] = {}
):
resource: str | None = None
# Construct resource name from the path
parent = path.split("/")[-1]
if parent == "v1":
resource = ""
elif parent.endswith("ies"):
resource = parent[0 : len(parent) - 3] + "y"
else:
resource = parent[0:-1]
for part in [x for x in dir(node) if callable(getattr(node, x))]:
# Iterate over functions to find what is exposed on the current
# level
obj = getattr(node, part)
_pecan = getattr(obj, "_pecan", None)
exposed = getattr(obj, "exposed", None)
if _pecan and exposed:
# Only whatever is pecan exposed is of interest
conditions = {}
action = None
url = path
# resource = None
# parent = url.split("/")[-1]
# Identify the action from function name
# https://pecan.readthedocs.io/en/latest/rest.html#url-mapping
if part == "on_get":
if not resource:
continue
conditions["method"] = ["GET"]
action = "show"
url += f"/{{{resource}_id}}"
elif part == "index":
conditions["method"] = ["GET"]
action = "list"
elif part == "get":
conditions["method"] = ["GET"]
action = "get"
# "Get" is tricky, it can be normal and root, so need to inspect params
sig = inspect.signature(obj)
for pname, pval in sig.parameters.items():
if "id" in pname and pval.default == pval.empty:
url += f"/{{{resource}_id}}"
elif part == "on_post":
conditions["method"] = ["POST"]
action = "create"
# url += f"/{{{resource}_id}}"
elif part == "on_put":
conditions["method"] = ["PUT"]
action = "update"
url += f"/{{{resource}_id}}"
elif part == "on_delete":
conditions["method"] = ["DELETE"]
action = "delete"
url += f"/{{{resource}_id}}"
elif part.startswith("get_"):
conditions["method"] = ["GET"]
action = part[4:]
url += f"/{action.replace('_', '-')}"
if action:
# If we identified method as "interesting" register it into
# the routes mapper
mapper.connect(
None,
url,
controller=obj,
action=action,
conditions=conditions,
)
if not hasattr(node, "__dict__"):
return
for subcontroller, v in (
node.__dict__.items()
if isinstance(node, object)
else node.__class__.__dict__.items()
):
# Iterate over node attributes for subcontrollers
if subcontroller.startswith("_"):
continue
# if subcontroller in ["central_api", "__wrapped__", "SORT_KEYS"]:
# # Not interested in those
# continue
subpath = f"{path}/{subcontroller}"
next_param_url_map: dict = {
k: f"{v}/{subcontroller}" for k, v in param_url_map.items()
}
self._build_routes(mapper, v, subpath, next_param_url_map)
return
def generate(self, target_dir, args):
proc = Process(target=self._generate, args=[target_dir, args])
proc.start()
proc.join()
if proc.exitcode != 0:
raise RuntimeError("Error generating Barbican OpenAPI schema")
def _generate(self, target_dir, args):
from barbican.api import app
from barbican.api.controllers import acls
from barbican.api.controllers import containers
from barbican.api.controllers import consumers
from barbican.api.controllers import orders
from barbican.api.controllers import quotas
from barbican.api.controllers import secrets
from barbican.api.controllers import secretmeta
from barbican.api.controllers import secretstores
from barbican.api.controllers import transportkeys
from barbican.api.controllers import versions
from routes import Mapper
self.api_version = versions.MAX_API_VERSION
self.min_api_version = versions.MIN_API_VERSION
work_dir = Path(target_dir)
work_dir.mkdir(parents=True, exist_ok=True)
impl_path = Path(
work_dir,
"openapi_specs",
"key-manager",
f"v{self.api_version}.yaml",
)
impl_path.parent.mkdir(parents=True, exist_ok=True)
openapi_spec = self.load_openapi(Path(impl_path))
if not openapi_spec:
openapi_spec = SpecSchema(
info={
"title": "OpenStack key-manager API",
"description": LiteralScalarString(
"key-manager API provided by Barbican service"
),
"version": self.api_version,
},
openapi="3.1.0",
security=[{"ApiKeyAuth": []}],
components={
"securitySchemes": {
"ApiKeyAuth": {
"type": "apiKey",
"in": "header",
"name": "X-Auth-Token",
}
}
},
)
self.app = app.build_wsgi_app(controller=None)
self.root = self.app.root
mapper = Mapper()
self._build_routes(mapper, self.root, "/v1")
mapper.connect(
None,
"/v1/secrets/{secret_id}",
controller=secrets.SecretController.on_get,
action="get",
conditions={"method": ["GET"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}",
controller=secrets.SecretController.on_put,
action="create",
conditions={"method": ["PUT"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}",
controller=secrets.SecretController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/payload",
controller=secrets.SecretController.payload,
action="payload",
conditions={"method": ["GET"]},
)
# Secret ACL
mapper.connect(
None,
"/v1/secrets/{secret_id}/acl",
controller=acls.SecretACLsController.on_get,
action="get",
conditions={"method": ["GET"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/acl",
controller=acls.SecretACLsController.on_put,
action="create",
conditions={"method": ["PUT"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/acl",
controller=acls.SecretACLsController.on_patch,
action="update",
conditions={"method": ["PATCH"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/acl",
controller=acls.SecretACLsController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
# Secret metadata
mapper.connect(
None,
"/v1/secrets/{secret_id}/metadata",
controller=secretmeta.SecretMetadataController.on_get,
action="list",
conditions={"method": ["GET"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/metadata",
controller=secretmeta.SecretMetadataController.on_put,
action="create",
conditions={"method": ["PUT"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/metadata",
controller=secretmeta.SecretMetadataController.on_post,
action="replace",
conditions={"method": ["POST"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/metadata/{key}",
controller=secretmeta.SecretMetadatumController.on_get,
action="show",
conditions={"method": ["GET"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/metadata/{key}",
controller=secretmeta.SecretMetadatumController.on_put,
action="update",
conditions={"method": ["PUT"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/metadata/{key}",
controller=secretmeta.SecretMetadatumController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/consumers",
controller=consumers.SecretConsumersController.on_get,
action="list",
conditions={"method": ["GET"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/consumers",
controller=consumers.SecretConsumersController.on_post,
action="create",
conditions={"method": ["POST"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/consumers",
controller=consumers.SecretConsumersController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
# Secret stores
mapper.connect(
None,
"/v1/secret-stores/{secret_store_id}/preferred",
controller=secretstores.PreferredSecretStoreController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
mapper.connect(
None,
"/v1/secret-stores/{secret_store_id}/preferred",
controller=secretstores.PreferredSecretStoreController.on_post,
action="set",
conditions={"method": ["POST"]},
)
# Containers
mapper.connect(
None,
"/v1/containers/{container_id}",
controller=containers.ContainerController.on_get,
action="show",
conditions={"method": ["GET"]},
)
mapper.connect(
None,
"/v1/containers/{container_id}",
controller=containers.ContainerController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
mapper.connect(
None,
"/v1/containers/{container_id}/secrets",
controller=containers.ContainersSecretsController.on_post,
action="create",
conditions={"method": ["POST"]},
)
mapper.connect(
None,
"/v1/containers/{container_id}/secrets",
controller=containers.ContainersSecretsController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
# Contanier consumers
mapper.connect(
None,
"/v1/containers/{container_id}/consumers",
controller=consumers.ContainerConsumersController.on_get,
action="get",
conditions={"method": ["GET"]},
)
mapper.connect(
None,
"/v1/containers/{container_id}/consumers",
controller=consumers.ContainerConsumersController.on_post,
action="post",
conditions={"method": ["POST"]},
)
mapper.connect(
None,
"/v1/containers/{container_id}/consumers",
controller=consumers.ContainerConsumersController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
# Container ACL
mapper.connect(
None,
"/v1/containers/{container_id}/acl",
controller=acls.ContainerACLsController.on_get,
action="get",
conditions={"method": ["GET"]},
)
mapper.connect(
None,
"/v1/containers/{container_id}/acl",
controller=acls.ContainerACLsController.on_put,
action="create",
conditions={"method": ["PUT"]},
)
mapper.connect(
None,
"/v1/containers/{container_id}/acl",
controller=acls.ContainerACLsController.on_patch,
action="update",
conditions={"method": ["PATCH"]},
)
mapper.connect(
None,
"/v1/containers/{container_id}/acl",
controller=acls.ContainerACLsController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
# Orders
mapper.connect(
None,
"/v1/orders/{order_id}",
controller=orders.OrderController.on_get,
action="show",
conditions={"method": ["GET"]},
)
mapper.connect(
None,
"/v1/orders/{order_id}",
controller=orders.OrderController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
# Transport keys
mapper.connect(
None,
"/v1/transport_keys/{transport_key_id}",
controller=transportkeys.TransportKeyController.on_get,
action="get",
conditions={"method": ["GET"]},
)
mapper.connect(
None,
"/v1/transport_keys/{transport_key_id}",
controller=transportkeys.TransportKeyController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
# Quotas
mapper.connect(
None,
"/v1/project-quotas/{project_id}",
controller=quotas.ProjectQuotasController.on_put,
action="update",
conditions={"method": ["PUT"]},
)
mapper.connect(
None,
"/v1/project-quotas/{project_id}",
controller=quotas.ProjectQuotasController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
for route in mapper.matchlist:
if route.routelist == [
"/v1/orders/",
{"type": ":", "name": "order_id"},
] and route.conditions.get("method", "GET") == ["PUT"]:
# update order is not supported
continue
if route.routelist == [
"/v1/quotas/",
{"type": ":", "name": "quota_id"},
] and route.conditions.get("method", "GET") == ["GET"]:
# Get of the project quota is done using different URL
continue
self._process_route(route, openapi_spec, framework="pecan")
if args.api_ref_src:
merge_api_ref_doc(
openapi_spec, args.api_ref_src, allow_strip_version=False
)
self.dump_openapi(
openapi_spec,
Path(impl_path),
args.validate,
"key-manager",
args.yaml_version,
)
lnk = Path(impl_path.parent, "v1.yaml")
lnk.unlink(missing_ok=True)
lnk.symlink_to(impl_path.name)
return impl_path
def _post_process_operation_hook(
self, openapi_spec, operation_spec, path: str | None = None
):
"""Hook to allow service specific generator to modify details"""
for resource_mod in self.RESOURCE_MODULES:
hook = getattr(resource_mod, "_post_process_operation_hook", None)
if hook:
hook(openapi_spec, operation_spec, path=path)
def _get_schema_ref(
self,
openapi_spec,
name,
description=None,
schema_def=None,
action_name=None,
):
# Invoke modularized schema _get_schema_ref
for resource_mod in self.RESOURCE_MODULES:
hook = getattr(resource_mod, "_get_schema_ref", None)
if hook:
(ref, mime_type, matched) = hook(
openapi_spec, name, description, schema_def, action_name
)
if matched:
return (ref, mime_type)
# Default
(ref, mime_type) = super()._get_schema_ref(
openapi_spec,
name,
description,
schema_def=schema_def,
action_name=action_name,
)
return (ref, mime_type)