Files
codegenerator/codegenerator/openapi/barbican.py
Artem Goncharov 2d36da5a7f Hardcode barbican secret_store schemas
Change-Id: I61312727e8fbb18e5883963f85f0f413175aa7cf
2025-02-12 13:39:40 +01:00

497 lines
17 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import inspect
from multiprocessing import Process
from pathlib import Path
from unittest import mock
import fixtures
from codegenerator.common.schema import SpecSchema
from codegenerator.openapi.base import OpenStackServerSourceBase
from codegenerator.openapi.utils import merge_api_ref_doc
from codegenerator.openapi.barbican_schemas import secret
from codegenerator.openapi.barbican_schemas import secret_store
from ruamel.yaml.scalarstring import LiteralScalarString
class BarbicanGenerator(OpenStackServerSourceBase):
URL_TAG_MAP = {}
RESOURCE_MODULES = [secret, secret_store]
def __init__(self):
pass
def _api_ver_major(self, ver):
return ver.ver_major
def _api_ver_minor(self, ver):
return ver.ver_minor
def _api_ver(self, ver):
return (ver.ver_major, ver.ver_minor)
def _build_routes(
self, mapper, node, path="", param_url_map: dict[str, str] = {}
):
resource: str | None = None
# Construct resource name from the path
parent = path.split("/")[-1]
if parent == "v1":
resource = ""
elif parent.endswith("ies"):
resource = parent[0 : len(parent) - 3] + "y"
else:
resource = parent[0:-1]
for part in [x for x in dir(node) if callable(getattr(node, x))]:
# Iterate over functions to find what is exposed on the current
# level
obj = getattr(node, part)
_pecan = getattr(obj, "_pecan", None)
exposed = getattr(obj, "exposed", None)
if _pecan and exposed:
# Only whatever is pecan exposed is of interest
conditions = {}
action = None
url = path
# resource = None
# parent = url.split("/")[-1]
# Identify the action from function name
# https://pecan.readthedocs.io/en/latest/rest.html#url-mapping
if part == "on_get":
if not resource:
continue
conditions["method"] = ["GET"]
action = "show"
url += f"/{{{resource}_id}}"
elif part == "index":
conditions["method"] = ["GET"]
action = "list"
elif part == "get":
conditions["method"] = ["GET"]
action = "get"
# "Get" is tricky, it can be normal and root, so need to inspect params
sig = inspect.signature(obj)
for pname, pval in sig.parameters.items():
if "id" in pname and pval.default == pval.empty:
url += f"/{{{resource}_id}}"
elif part == "on_post":
conditions["method"] = ["POST"]
action = "create"
# url += f"/{{{resource}_id}}"
elif part == "on_put":
conditions["method"] = ["PUT"]
action = "update"
url += f"/{{{resource}_id}}"
elif part == "on_delete":
conditions["method"] = ["DELETE"]
action = "delete"
url += f"/{{{resource}_id}}"
elif part.startswith("get_"):
conditions["method"] = ["GET"]
action = part[4:]
url += f"/{action.replace('_', '-')}"
if action:
# If we identified method as "interesting" register it into
# the routes mapper
mapper.connect(
None,
url,
controller=obj,
action=action,
conditions=conditions,
)
if not hasattr(node, "__dict__"):
return
for subcontroller, v in (
node.__dict__.items()
if isinstance(node, object)
else node.__class__.__dict__.items()
):
# Iterate over node attributes for subcontrollers
if subcontroller.startswith("_"):
continue
# if subcontroller in ["central_api", "__wrapped__", "SORT_KEYS"]:
# # Not interested in those
# continue
subpath = f"{path}/{subcontroller}"
next_param_url_map: dict = {
k: f"{v}/{subcontroller}" for k, v in param_url_map.items()
}
self._build_routes(mapper, v, subpath, next_param_url_map)
return
def generate(self, target_dir, args):
proc = Process(target=self._generate, args=[target_dir, args])
proc.start()
proc.join()
if proc.exitcode != 0:
raise RuntimeError("Error generating Barbican OpenAPI schema")
def _generate(self, target_dir, args):
from barbican.api import app
from barbican.api.controllers import acls
from barbican.api.controllers import containers
from barbican.api.controllers import consumers
from barbican.api.controllers import orders
from barbican.api.controllers import quotas
from barbican.api.controllers import secrets
from barbican.api.controllers import secretmeta
from barbican.api.controllers import secretstores
from barbican.api.controllers import transportkeys
from barbican.api.controllers import versions
from routes import Mapper
self.api_version = versions.MAX_API_VERSION
self.min_api_version = versions.MIN_API_VERSION
work_dir = Path(target_dir)
work_dir.mkdir(parents=True, exist_ok=True)
impl_path = Path(
work_dir,
"openapi_specs",
"key-manager",
f"v{self.api_version}.yaml",
)
impl_path.parent.mkdir(parents=True, exist_ok=True)
openapi_spec = self.load_openapi(Path(impl_path))
if not openapi_spec:
openapi_spec = SpecSchema(
info={
"title": "OpenStack key-manager API",
"description": LiteralScalarString(
"key-manager API provided by Barbican service"
),
"version": self.api_version,
},
openapi="3.1.0",
security=[{"ApiKeyAuth": []}],
components={
"securitySchemes": {
"ApiKeyAuth": {
"type": "apiKey",
"in": "header",
"name": "X-Auth-Token",
}
}
},
)
self.app = app.build_wsgi_app(controller=None)
self.root = self.app.root
mapper = Mapper()
self._build_routes(mapper, self.root, "/v1")
mapper.connect(
None,
"/v1/secrets/{secret_id}",
controller=secrets.SecretController.on_get,
action="get",
conditions={"method": ["GET"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}",
controller=secrets.SecretController.on_put,
action="create",
conditions={"method": ["PUT"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}",
controller=secrets.SecretController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/payload",
controller=secrets.SecretController.payload,
action="payload",
conditions={"method": ["GET"]},
)
# Secret ACL
mapper.connect(
None,
"/v1/secrets/{secret_id}/acl",
controller=acls.SecretACLsController.on_get,
action="get",
conditions={"method": ["GET"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/acl",
controller=acls.SecretACLsController.on_put,
action="create",
conditions={"method": ["PUT"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/acl",
controller=acls.SecretACLsController.on_patch,
action="update",
conditions={"method": ["PATCH"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/acl",
controller=acls.SecretACLsController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
# Secret metadata
mapper.connect(
None,
"/v1/secrets/{secret_id}/metadata",
controller=secretmeta.SecretMetadataController.on_get,
action="list",
conditions={"method": ["GET"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/metadata",
controller=secretmeta.SecretMetadataController.on_put,
action="create",
conditions={"method": ["PUT"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/metadata",
controller=secretmeta.SecretMetadataController.on_post,
action="replace",
conditions={"method": ["POST"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/metadata/{key}",
controller=secretmeta.SecretMetadatumController.on_get,
action="show",
conditions={"method": ["GET"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/metadata/{key}",
controller=secretmeta.SecretMetadatumController.on_put,
action="update",
conditions={"method": ["PUT"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/metadata/{key}",
controller=secretmeta.SecretMetadatumController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/consumers",
controller=consumers.SecretConsumersController.on_get,
action="list",
conditions={"method": ["GET"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/consumers",
controller=consumers.SecretConsumersController.on_post,
action="create",
conditions={"method": ["POST"]},
)
mapper.connect(
None,
"/v1/secrets/{secret_id}/consumers",
controller=consumers.SecretConsumersController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
# Secret stores
mapper.connect(
None,
"/v1/secret-stores/{secret_store_id}/preferred",
controller=secretstores.PreferredSecretStoreController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
mapper.connect(
None,
"/v1/secret-stores/{secret_store_id}/preferred",
controller=secretstores.PreferredSecretStoreController.on_post,
action="set",
conditions={"method": ["POST"]},
)
# Containers
mapper.connect(
None,
"/v1/containers/{container_id}",
controller=containers.ContainerController.on_get,
action="get",
conditions={"method": ["GET"]},
)
mapper.connect(
None,
"/v1/containers/{container_id}",
controller=containers.ContainerController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
mapper.connect(
None,
"/v1/containers/{container_id}/secrets",
controller=containers.ContainersSecretsController.on_post,
action="create",
conditions={"method": ["POST"]},
)
mapper.connect(
None,
"/v1/containers/{container_id}/secrets",
controller=containers.ContainersSecretsController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
# Contanier consumers
mapper.connect(
None,
"/v1/containers/{container_id}/consumers",
controller=consumers.ContainerConsumersController.on_get,
action="get",
conditions={"method": ["GET"]},
)
mapper.connect(
None,
"/v1/containers/{container_id}/consumers",
controller=consumers.ContainerConsumersController.on_post,
action="post",
conditions={"method": ["POST"]},
)
mapper.connect(
None,
"/v1/containers/{container_id}/consumers",
controller=consumers.ContainerConsumersController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
# Orders
mapper.connect(
None,
"/v1/orders/{order_id}",
controller=orders.OrderController.on_get,
action="get",
conditions={"method": ["GET"]},
)
mapper.connect(
None,
"/v1/orders/{order_id}",
controller=orders.OrderController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
# Transport keys
mapper.connect(
None,
"/v1/transport_keys/{transport_key_id}",
controller=transportkeys.TransportKeyController.on_get,
action="get",
conditions={"method": ["GET"]},
)
mapper.connect(
None,
"/v1/transport_keys/{transport_key_id}",
controller=transportkeys.TransportKeyController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
# Quotas
mapper.connect(
None,
"/v1/project-quotas/{project_id}",
controller=quotas.ProjectQuotasController.on_put,
action="update",
conditions={"method": ["PUT"]},
)
mapper.connect(
None,
"/v1/project-quotas/{project_id}",
controller=quotas.ProjectQuotasController.on_delete,
action="delete",
conditions={"method": ["DELETE"]},
)
for route in mapper.matchlist:
self._process_route(route, openapi_spec, framework="pecan")
if args.api_ref_src:
merge_api_ref_doc(
openapi_spec, args.api_ref_src, allow_strip_version=False
)
self.dump_openapi(
openapi_spec, Path(impl_path), args.validate, "key-manager"
)
lnk = Path(impl_path.parent, "v1.yaml")
lnk.unlink(missing_ok=True)
lnk.symlink_to(impl_path.name)
return impl_path
def _post_process_operation_hook(
self, openapi_spec, operation_spec, path: str | None = None
):
"""Hook to allow service specific generator to modify details"""
for resource_mod in self.RESOURCE_MODULES:
hook = getattr(resource_mod, "_post_process_operation_hook", None)
if hook:
hook(openapi_spec, operation_spec, path=path)
def _get_schema_ref(
self,
openapi_spec,
name,
description=None,
schema_def=None,
action_name=None,
):
# Invoke modularized schema _get_schema_ref
for resource_mod in self.RESOURCE_MODULES:
hook = getattr(resource_mod, "_get_schema_ref", None)
if hook:
(ref, mime_type, matched) = hook(
openapi_spec, name, description, schema_def, action_name
)
if matched:
return (ref, mime_type)
# Default
(ref, mime_type) = super()._get_schema_ref(
openapi_spec,
name,
description,
schema_def=schema_def,
action_name=action_name,
)
return (ref, mime_type)