Fix network external_gateway_info schema

Recently validation of the external_gateway_info has been modified in
the Neutron so that we are not able to detect the schema anymore. Adapt
it to how it "look like it is intended".

Change-Id: I8976a019cd11b4cb3d8780553bb72b7ce6627519
This commit is contained in:
gtema
2025-02-28 17:33:30 +00:00
committed by Artem Goncharov
parent 4515701606
commit ea8f60e66c
4 changed files with 209 additions and 202 deletions

View File

@@ -317,7 +317,6 @@ class OpenStackServerSourceBase:
if path_elements and VERSION_RE.match(path_elements[0]):
path_elements.pop(0)
operation_tags = self._get_tags_for_url(path)
print(f"tags={operation_tags} for {path}")
# Build path parameters (/foo/{foo_id}/bar/{id} => $foo_id, $foo_bar_id)
# Since for same path we are here multiple times check presence of

View File

@@ -1200,7 +1200,7 @@ class NeutronGenerator(OpenStackServerSourceBase):
def get_schema(param_data):
"""Convert Neutron API definition into json schema"""
schema: dict[str, Any] = {}
validate = param_data.get("validate")
validate = param_data.get("validate", {})
convert_to = param_data.get("convert_to")
typ_ = "string"
if convert_to:
@@ -1212,204 +1212,202 @@ def get_schema(param_data):
typ_ = "integer"
if validate:
if "type:uuid" in validate:
schema = {"type": "string", "format": "uuid"}
elif "type:uuid_or_none" in validate:
schema = {"type": ["string", "null"], "format": "uuid"}
elif "type:uuid_list" in validate:
schema = {
"type": "array",
"items": {"type": "string", "format": "uuid"},
}
elif "type:string" in validate:
length = validate.get("type:string")
schema = {"type": "string"}
if length:
schema["maxLength"] = length
elif "type:string_or_none" in validate:
length = validate.get("type:string_or_none")
schema = {"type": ["string", "null"]}
if length:
schema["maxLength"] = length
elif "type:list_of_unique_strings" in validate:
length = validate.get("type:list_of_unique_strings")
schema = {
"type": "array",
"items": {"type": "string"},
"uniqueItems": True,
}
if length:
schema["items"]["maxLength"] = length
elif "type:dict_or_none" in validate:
schema = {"type": ["object", "null"]}
elif "type:mac_address" in validate:
schema = {"type": "string"}
elif "type:dns_host_name" in validate:
length = validate.get("type:dns_host_name")
schema = {"type": "string", "format": "hostname"}
if length:
schema["maxLength"] = length
elif "type:values" in validate:
schema = {"type": typ_, "enum": list(validate["type:values"])}
elif "type:range" in validate:
r = validate["type:range"]
schema = {"type": "number", "minimum": r[0], "maximum": r[1]}
elif "type:range_or_none" in validate:
r = validate["type:range_or_none"]
schema = {
"type": ["number", "null"],
"minimum": r[0],
"maximum": r[1],
}
elif "type:port_range" in validate:
r = validate["type:port_range"]
schema = {"type": "number", "minimum": r[0], "maximum": r[1]}
elif "type:external_gw_info" in validate:
schema = {
schema_data = validate
else:
schema_data = param_data
if "type:uuid" in schema_data:
schema = {"type": "string", "format": "uuid"}
elif "type:uuid_or_none" in schema_data:
schema = {"type": ["string", "null"], "format": "uuid"}
elif "type:uuid_list" in schema_data:
schema = {
"type": "array",
"items": {"type": "string", "format": "uuid"},
}
elif "type:string" in schema_data:
length = validate.get("type:string")
schema = {"type": "string"}
if length:
schema["maxLength"] = length
elif "type:string_or_none" in schema_data:
length = validate.get("type:string_or_none")
schema = {"type": ["string", "null"]}
if length:
schema["maxLength"] = length
elif "type:list_of_unique_strings" in schema_data:
length = validate.get("type:list_of_unique_strings")
schema = {
"type": "array",
"items": {"type": "string"},
"uniqueItems": True,
}
if length:
schema["items"]["maxLength"] = length
elif "type:dict_or_none" in schema_data:
schema = {"type": ["object", "null"]}
elif "type:mac_address" in schema_data:
schema = {"type": "string"}
elif "type:dns_host_name" in schema_data:
length = validate.get("type:dns_host_name")
schema = {"type": "string", "format": "hostname"}
if length:
schema["maxLength"] = length
elif "type:values" in schema_data:
schema = {"type": typ_, "enum": list(validate["type:values"])}
elif "type:range" in schema_data:
r = validate["type:range"]
schema = {"type": "number", "minimum": r[0], "maximum": r[1]}
elif "type:range_or_none" in schema_data:
r = validate["type:range_or_none"]
schema = {"type": ["number", "null"], "minimum": r[0], "maximum": r[1]}
elif "type:port_range" in schema_data:
r = validate["type:port_range"]
schema = {"type": "number", "minimum": r[0], "maximum": r[1]}
elif "type:external_gw_info" in schema_data:
schema = {
"type": "object",
"properties": {
"network_id": {"type": "string", "format": "uuid"},
"enable_snat": {"type": "boolean"},
"external_fixed_ips": {
"type": "array",
"items": {
"type": "object",
"properties": {
"ip_address": {"type": "string"},
"subnet_id": {"type": "string", "format": "uuid"},
},
},
},
},
"required": ["network_id"],
}
elif "type:availability_zone_hint_list" in schema_data:
schema = {"type": "array", "items": {"type": "string"}}
elif "type:hostroutes" in schema_data:
schema = {
"type": "array",
"items": {
"type": "object",
"properties": {
"network_id": {"type": "string", "format": "uuid"},
"enable_snat": {"type": "boolean"},
"external_fixed_ips": {
"type": "array",
"items": {
"type": "object",
"properties": {
"ip_address": {"type": "string"},
"subnet_id": {
"type": "string",
"format": "uuid",
},
},
},
"destination": {"type": "string"},
"nexthop": {"type": "string"},
},
},
}
elif "type:network_segments" in schema_data:
schema = {
"type": "array",
"items": {
"type": "object",
"properties": {
"provider:segmentation_id": {"type": "integer"},
"provider:physical_network": {"type": "string"},
"provider:network_type": {"type": "string"},
},
},
}
elif "type:non_negative" in schema_data:
schema = {"type": "integer", "minimum": 0}
elif "type:dns_domain_name" in schema_data:
length = schema_data.get("type:dns_domain_name")
schema = {"type": "string", "format": "hostname"}
if length:
schema["maxLength"] = length
elif "type:fixed_ips" in schema_data:
schema = {
"type": "array",
"items": {
"type": "object",
"properties": {
"ip_address": {
"type": "string",
"description": "IP Address",
},
"subnet_id": {
"type": "string",
"description": "The subnet ID from which the IP address is assigned",
},
},
"required": ["network_id"],
}
elif "type:availability_zone_hint_list" in validate:
schema = {"type": "array", "items": {"type": "string"}}
elif "type:hostroutes" in validate:
schema = {
"type": "array",
"items": {
"type": "object",
"properties": {
"destination": {"type": "string"},
"nexthop": {"type": "string"},
},
},
}
elif "type:allowed_address_pairs" in schema_data:
schema = {
"type": "array",
"items": {
"type": "object",
"properties": {
"ip_address": {"type": "string"},
"max_address": {"type": "string"},
},
}
elif "type:network_segments" in validate:
schema = {
"type": "array",
"items": {
"type": "object",
"properties": {
"provider:segmentation_id": {"type": "integer"},
"provider:physical_network": {"type": "string"},
"provider:network_type": {"type": "string"},
},
},
}
elif "type:list_of_any_key_specs_or_none" in schema_data:
logging.warning("TODO: Implement type:list_of_any_key_specs_or_none")
schema = {
"type": "array",
"items": {"type": "object", "extraProperties": True},
"x-openstack": {"todo": "implementme"},
}
elif "type:subnet_list" in schema_data:
schema = {"type": "array", "items": {"type": "string"}}
elif "type:service_plugin_type" in schema_data:
schema = {"type": "string"}
elif "type:ip_address" in schema_data:
schema = {"type": "string"}
elif "type:ip_address_or_none" in schema_data:
schema = {"type": "string"}
elif "type:subnet_or_none" in schema_data:
schema = {"type": ["string", "null"]}
elif "type:fip_dns_host_name" in schema_data:
length = schema_data.get("type:fip_dns_host_name")
schema = {"type": "string"}
if length:
schema["maxLength"] = length
elif "type:name_not_default" in schema_data:
length = schema_data.get("type:name_not_default")
schema = {"type": "string"}
if length:
schema["maxLength"] = length
elif "type:not_empty_string" in schema_data:
length = schema_data.get("type:not_empty_string")
schema = {"type": "string"}
if length:
schema["maxLength"] = length
elif "type:subnetpool_id_or_none" in schema_data:
schema = {"type": ["string", "null"]}
elif "type:ip_pools" in schema_data:
schema = {
"type": "array",
"items": {
"type": "object",
"properties": {
"start": {"type": "string"},
"end": {"type": "string"},
},
}
elif "type:non_negative" in validate:
schema = {"type": "integer", "minimum": 0}
elif "type:dns_domain_name" in validate:
length = validate.get("type:dns_domain_name")
schema = {"type": "string", "format": "hostname"}
if length:
schema["maxLength"] = length
elif "type:fixed_ips" in validate:
schema = {
"type": "array",
"items": {
"type": "object",
"properties": {
"ip_address": {
"type": "string",
"description": "IP Address",
},
"subnet_id": {
"type": "string",
"description": "The subnet ID from which the IP address is assigned",
},
},
},
}
elif "type:allowed_address_pairs" in validate:
schema = {
"type": "array",
"items": {
"type": "object",
"properties": {
"ip_address": {"type": "string"},
"max_address": {"type": "string"},
},
},
}
elif "type:list_of_any_key_specs_or_none" in validate:
logging.warning(
"TODO: Implement type:list_of_any_key_specs_or_none"
)
schema = {
"type": "array",
"items": {"type": "object", "extraProperties": True},
"x-openstack": {"todo": "implementme"},
}
elif "type:subnet_list" in validate:
schema = {"type": "array", "items": {"type": "string"}}
elif "type:service_plugin_type" in validate:
schema = {"type": "string"}
elif "type:ip_address" in validate:
schema = {"type": "string"}
elif "type:ip_address_or_none" in validate:
schema = {"type": "string"}
elif "type:subnet_or_none" in validate:
schema = {"type": ["string", "null"]}
elif "type:fip_dns_host_name" in validate:
length = validate.get("type:fip_dns_host_name")
schema = {"type": "string"}
if length:
schema["maxLength"] = length
elif "type:name_not_default" in validate:
length = validate.get("type:name_not_default")
schema = {"type": "string"}
if length:
schema["maxLength"] = length
elif "type:not_empty_string" in validate:
length = validate.get("type:not_empty_string")
schema = {"type": "string"}
if length:
schema["maxLength"] = length
elif "type:subnetpool_id_or_none" in validate:
schema = {"type": ["string", "null"]}
elif "type:ip_pools" in validate:
schema = {
"type": "array",
"items": {
"type": "object",
"properties": {
"start": {"type": "string"},
"end": {"type": "string"},
},
},
}
elif "type:nameservers" in validate:
schema = {"type": "array", "items": {"type": "string"}}
elif "type:list_of_subnet_service_types" in validate:
schema = {
"type": "array",
"description": "The service types associated with the subnet",
"items": {"type": "string"},
}
elif "type:dict_or_nodata" in validate:
schema = get_schema(validate["type:dict_or_nodata"])
elif "type:dict_or_empty" in validate:
schema = get_schema(validate["type:dict_or_empty"])
elif "type:list_of_subnets_or_none" in validate:
schema = {"type": "array", "items": {"type": "string"}}
else:
raise RuntimeError(f"Unsupported type {validate} in {param_data}")
schema = {"type": "string"}
},
}
elif "type:nameservers" in schema_data:
schema = {"type": "array", "items": {"type": "string"}}
elif "type:list_of_subnet_service_types" in schema_data:
schema = {
"type": "array",
"description": "The service types associated with the subnet",
"items": {"type": "string"},
}
elif "type:dict_or_nodata" in schema_data:
schema = {"type": ["object", "null"], "properties": {}, "required": []}
for k, v in schema_data.get("type:dict_or_nodata", {}).items():
v_schema = get_schema(v)
if v_schema:
schema["properties"][k] = v_schema
required = v.get("required", False)
if required:
schema["required"].append(k)
elif "type:dict_or_empty" in schema_data:
schema = get_schema(schema_data["type:dict_or_empty"])
elif "type:list_of_subnets_or_none" in schema_data:
schema = {"type": "array", "items": {"type": "string"}}
if convert_to:
# Nice way to get type of the field, isn't it?
if convert_to.__name__ == "convert_to_boolean":

View File

@@ -666,13 +666,15 @@ class RequestTypeManager(common_rust.TypeManager):
field_data_type = self.convert_model(field.data_type)
if isinstance(field_data_type, self.option_type_class):
# Unwrap Option into "is_nullable"
# NOTE: but perhaps
# Option<Option> is better (not set vs set explicitly to None
# )
# NOTE: but perhaps Option<Option> is better (not set vs set
# explicitly to None)
is_nullable = True
if isinstance(
field_data_type.item_type,
(common_rust.Array, DictionaryInput, String),
) or (
mod.name != "Request"
and isinstance(field_data_type.item_type, StructInput)
):
# Unwrap Option<Option<...>>
field_data_type = field_data_type.item_type
@@ -729,8 +731,8 @@ class RequestTypeManager(common_rust.TypeManager):
is_optional=not field.is_required,
is_nullable=is_nullable,
)
if mod.name != "Request" and isinstance(
field_data_type, struct_class
if mod.name != "Request" and (
isinstance(field_data_type, struct_class)
):
field_data_type.is_group = True
field_data_type.is_required = field.is_required

View File

@@ -373,13 +373,21 @@ class ResponseTypeManager(common_rust.TypeManager):
is_nullable: bool = False
field_data_type = self.convert_model(field.data_type)
if isinstance(field_data_type, self.option_type_class):
# Unwrap Option into "is_nullable" NOTE: but perhaps
# Option<Option> is better (not set vs set explicitly to None
# )
# Unwrap Option into "is_nullable"
# NOTE: but perhaps Option<Option> is better (not set vs set
# explicitly to None)
is_nullable = True
if isinstance(field_data_type.item_type, common_rust.Array):
# Unwrap Option<Option<Vec...>>
field_data_type = field_data_type.item_type
elif not isinstance(
field_data_type.item_type, BasePrimitiveType
):
# Everything more complex than a primitive goes to Value
field_data_type = common_rust.JsonValue(
**field_data_type.model_dump()
)
self.ignored_models.append(field.data_type)
elif not isinstance(field_data_type, BasePrimitiveType):
field_data_type = common_rust.JsonValue(
**field_data_type.model_dump()