Merge "Replace CRLF by LF"

This commit is contained in:
Zuul 2024-03-15 05:50:49 +00:00 committed by Gerrit Code Review
commit 9c797d08e4
16 changed files with 2099 additions and 2099 deletions

View File

@ -1,6 +1,6 @@
--- ---
features: features:
- Support Tacker service to obtain an OAuth 2.0 access token from an - Support Tacker service to obtain an OAuth 2.0 access token from an
external authorization server, and then use the access token to access external authorization server, and then use the access token to access
related OpenStack services that uses the external_oauth2_token filter related OpenStack services that uses the external_oauth2_token filter
provided by the keystone middleware for permission authentication. provided by the keystone middleware for permission authentication.

View File

@ -1,11 +1,11 @@
kind: ClusterRoleBinding kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1 apiVersion: rbac.authorization.k8s.io/v1
metadata: metadata:
name: oidc-cluster-admin-binding name: oidc-cluster-admin-binding
roleRef: roleRef:
kind: ClusterRole kind: ClusterRole
name: cluster-admin name: cluster-admin
apiGroup: rbac.authorization.k8s.io apiGroup: rbac.authorization.k8s.io
subjects: subjects:
- kind: User - kind: User
name: end-user name: end-user

View File

@ -1,12 +1,12 @@
[req] [req]
req_extensions = v3_req req_extensions = v3_req
distinguished_name = req_distinguished_name distinguished_name = req_distinguished_name
[req_distinguished_name] [req_distinguished_name]
[ v3_req ] [ v3_req ]
basicConstraints = CA:FALSE basicConstraints = CA:FALSE
keyUsage = nonRepudiation, digitalSignature, keyEncipherment keyUsage = nonRepudiation, digitalSignature, keyEncipherment
subjectAltName = @alt_names subjectAltName = @alt_names
[alt_names] [alt_names]
IP.1 = 127.0.0.1 IP.1 = 127.0.0.1

View File

@ -1,35 +1,35 @@
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
# a copy of the License at # a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import tacker.vnfm.lcm_user_data.utils as UserDataUtil import tacker.vnfm.lcm_user_data.utils as UserDataUtil
from tacker.vnfm.lcm_user_data.abstract_user_data import AbstractUserData from tacker.vnfm.lcm_user_data.abstract_user_data import AbstractUserData
class SampleUserData(AbstractUserData): class SampleUserData(AbstractUserData):
@staticmethod @staticmethod
def instantiate(base_hot_dict=None, def instantiate(base_hot_dict=None,
vnfd_dict=None, vnfd_dict=None,
inst_req_info=None, inst_req_info=None,
grant_info=None): grant_info=None):
api_param = UserDataUtil.get_diff_base_hot_param_from_api( api_param = UserDataUtil.get_diff_base_hot_param_from_api(
base_hot_dict, inst_req_info) base_hot_dict, inst_req_info)
initial_param_dict = \ initial_param_dict = \
UserDataUtil.create_initial_param_server_port_dict(base_hot_dict) UserDataUtil.create_initial_param_server_port_dict(base_hot_dict)
vdu_flavor_dict = \ vdu_flavor_dict = \
UserDataUtil.create_vdu_flavor_capability_name_dict(vnfd_dict) UserDataUtil.create_vdu_flavor_capability_name_dict(vnfd_dict)
vdu_image_dict = UserDataUtil.create_sw_image_dict(vnfd_dict) vdu_image_dict = UserDataUtil.create_sw_image_dict(vnfd_dict)
cpd_vl_dict = UserDataUtil.create_network_dict(inst_req_info, cpd_vl_dict = UserDataUtil.create_network_dict(inst_req_info,
initial_param_dict) initial_param_dict)
final_param_dict = UserDataUtil.create_final_param_dict( final_param_dict = UserDataUtil.create_final_param_dict(
initial_param_dict, vdu_flavor_dict, vdu_image_dict, cpd_vl_dict) initial_param_dict, vdu_flavor_dict, vdu_image_dict, cpd_vl_dict)
return {**final_param_dict, **api_param} return {**final_param_dict, **api_param}

View File

@ -1,72 +1,72 @@
tosca_definitions_version: tosca_simple_yaml_1_2 tosca_definitions_version: tosca_simple_yaml_1_2
description: VNF template definition description: VNF template definition
imports: imports:
- etsi_nfv_sol001_common_types.yaml - etsi_nfv_sol001_common_types.yaml
- etsi_nfv_sol001_vnfd_types.yaml - etsi_nfv_sol001_vnfd_types.yaml
node_types: node_types:
Sample.VNF.Node: Sample.VNF.Node:
derived_from: tosca.nodes.nfv.VNF derived_from: tosca.nodes.nfv.VNF
properties: properties:
descriptor_id: descriptor_id:
type: string type: string
default: '3b3c61e4-26b6-4686-80fc-e9ff83010c08' default: '3b3c61e4-26b6-4686-80fc-e9ff83010c08'
descriptor_version: descriptor_version:
type: string type: string
constraints: [ valid_values: [ '1.0' ] ] constraints: [ valid_values: [ '1.0' ] ]
default: '1.0' default: '1.0'
provider: provider:
type: string type: string
constraints: [ valid_values: [ Sample ] ] constraints: [ valid_values: [ Sample ] ]
default: Sample default: Sample
product_name: product_name:
type: string type: string
constraints: [ valid_values: [ Node ] ] constraints: [ valid_values: [ Node ] ]
default: Node default: Node
software_version: software_version:
type: string type: string
constraints: [ valid_values: [ '10.1' ] ] constraints: [ valid_values: [ '10.1' ] ]
default: '10.1' default: '10.1'
vnfm_info: vnfm_info:
type: list type: list
entry_schema: entry_schema:
type: string type: string
constraints: [ valid_values: [ Tacker ] ] constraints: [ valid_values: [ Tacker ] ]
default: [ Tacker ] default: [ Tacker ]
flavour_id: flavour_id:
type: string type: string
constraints: [ valid_values: [ ha, scalable ] ] constraints: [ valid_values: [ ha, scalable ] ]
default: ha default: ha
flavour_description: flavour_description:
type: string type: string
default: 'vnf' default: 'vnf'
requirements: requirements:
- VNF0_extnet0: - VNF0_extnet0:
capability: tosca.capabilities.nfv.VirtualLinkable capability: tosca.capabilities.nfv.VirtualLinkable
relationship: tosca.relationships.nfv.VirtualLinksTo relationship: tosca.relationships.nfv.VirtualLinksTo
occurrences: [ 0, 1 ] occurrences: [ 0, 1 ]
- VNF1_extnet0: - VNF1_extnet0:
capability: tosca.capabilities.nfv.VirtualLinkable capability: tosca.capabilities.nfv.VirtualLinkable
relationship: tosca.relationships.nfv.VirtualLinksTo relationship: tosca.relationships.nfv.VirtualLinksTo
occurrences: [ 0, 1 ] occurrences: [ 0, 1 ]
interfaces: interfaces:
Vnflcm: Vnflcm:
type: tosca.interfaces.nfv.Vnflcm type: tosca.interfaces.nfv.Vnflcm
instantiate: [] instantiate: []
instantiate_start: [] instantiate_start: []
instantiate_end: [] instantiate_end: []
scale: [] scale: []
scale_start: [] scale_start: []
scale_end: [] scale_end: []
heal: [] heal: []
heal_start: [] heal_start: []
heal_end: [] heal_end: []
terminate: [] terminate: []
terminate_start: [] terminate_start: []
terminate_end: [] terminate_end: []
modify_information: [] modify_information: []
modify_information_start: [] modify_information_start: []
modify_information_end: [] modify_information_end: []

View File

@ -1,31 +1,31 @@
tosca_definitions_version: tosca_simple_yaml_1_2 tosca_definitions_version: tosca_simple_yaml_1_2
description: VNF definitions description: VNF definitions
imports: imports:
- etsi_nfv_sol001_common_types.yaml - etsi_nfv_sol001_common_types.yaml
- etsi_nfv_sol001_vnfd_types.yaml - etsi_nfv_sol001_vnfd_types.yaml
- Common.yaml - Common.yaml
- df_ha.yaml - df_ha.yaml
- df_scalable.yaml - df_scalable.yaml
topology_template: topology_template:
inputs: inputs:
selected_flavour: selected_flavour:
type: string type: string
description: VNF deployment flavour selected by the consumer. Itis provided in the API. description: VNF deployment flavour selected by the consumer. Itis provided in the API.
node_templates: node_templates:
VNF: VNF:
type: Sample.VNF.Node type: Sample.VNF.Node
properties: properties:
flavour_id: { get_input: selected_flavour } flavour_id: { get_input: selected_flavour }
flavour_description: 'vnf' flavour_description: 'vnf'
descriptor_id: 75aaa9fa-9c79-dcf5-bda2-5b98a08c9f54 descriptor_id: 75aaa9fa-9c79-dcf5-bda2-5b98a08c9f54
provider: Sample provider: Sample
product_name: Node product_name: Node
software_version: '10.1' software_version: '10.1'
descriptor_version: '1.0' descriptor_version: '1.0'
vnfm_info: vnfm_info:
- Tacker - Tacker
requirements: requirements:

View File

@ -1,232 +1,232 @@
tosca_definitions_version: tosca_simple_yaml_1_2 tosca_definitions_version: tosca_simple_yaml_1_2
description: Sample VNF ha DF description: Sample VNF ha DF
imports: imports:
- etsi_nfv_sol001_common_types.yaml - etsi_nfv_sol001_common_types.yaml
- etsi_nfv_sol001_vnfd_types.yaml - etsi_nfv_sol001_vnfd_types.yaml
- Common.yaml - Common.yaml
topology_template: topology_template:
inputs: inputs:
descriptor_id: descriptor_id:
type: string type: string
descriptor_version: descriptor_version:
type: string type: string
provider: provider:
type: string type: string
product_name: product_name:
type: string type: string
software_version: software_version:
type: string type: string
vnfm_info: vnfm_info:
type: list type: list
entry_schema: entry_schema:
type: string type: string
flavour_id: flavour_id:
type: string type: string
flavour_description: flavour_description:
type: string type: string
configurable_properties: configurable_properties:
type: map type: map
substitution_mappings: substitution_mappings:
node_type: Sample.VNF.Node node_type: Sample.VNF.Node
properties: properties:
flavour_id: ha flavour_id: ha
requirements: requirements:
VDU0_extnet: [ VDU0_extCP0, external_virtual_link ] VDU0_extnet: [ VDU0_extCP0, external_virtual_link ]
VDU1_extnet: [ VDU1_extCP0, external_virtual_link ] VDU1_extnet: [ VDU1_extCP0, external_virtual_link ]
VDU_extnet: [ VDU_extvCP, external_virtual_link ] VDU_extnet: [ VDU_extvCP, external_virtual_link ]
RT_extnet: [ RT_extCP, external_virtual_link ] RT_extnet: [ RT_extCP, external_virtual_link ]
node_templates: node_templates:
VNF: VNF:
type: Sample.VNF.Node type: Sample.VNF.Node
properties: properties:
flavour_description: 'ha' flavour_description: 'ha'
configurable_properties: configurable_properties:
is_autoscale_enabled: false is_autoscale_enabled: false
is_autoheal_enabled: false is_autoheal_enabled: false
vnfm_info: vnfm_info:
- Tacker - Tacker
interfaces: interfaces:
Vnflcm: Vnflcm:
instantiate: [] instantiate: []
instantiate_start: [] instantiate_start: []
instantiate_end: [] instantiate_end: []
scale: [] scale: []
scale_start: [] scale_start: []
scale_end: [] scale_end: []
heal: [] heal: []
heal_start: [] heal_start: []
heal_end: [] heal_end: []
terminate: [] terminate: []
terminate_start: [] terminate_start: []
terminate_end: [] terminate_end: []
modify_information: [] modify_information: []
modify_information_start: [] modify_information_start: []
modify_information_end: [] modify_information_end: []
VDU_0: VDU_0:
type: tosca.nodes.nfv.Vdu.Compute type: tosca.nodes.nfv.Vdu.Compute
properties: properties:
name: VDU_0 name: VDU_0
description: VDU_0 description: VDU_0
vdu_profile: vdu_profile:
min_number_of_instances: 1 min_number_of_instances: 1
max_number_of_instances: 1 max_number_of_instances: 1
sw_image_data: sw_image_data:
name: sample_image name: sample_image
version: '1.0' version: '1.0'
checksum: checksum:
algorithm: sha-512 algorithm: sha-512
hash: 6513f21e44aa3da349f248188a44bc304a3653a04122d8fb4535423c8e1d14cd6a153f735bb0982e2161b5b5186106570c17a9e58b64dd39390617cd5a350f78 hash: 6513f21e44aa3da349f248188a44bc304a3653a04122d8fb4535423c8e1d14cd6a153f735bb0982e2161b5b5186106570c17a9e58b64dd39390617cd5a350f78
container_format: bare container_format: bare
disk_format: qcow2 disk_format: qcow2
min_disk: 0 GB min_disk: 0 GB
size: 1869 MB size: 1869 MB
capabilities: capabilities:
virtual_compute: virtual_compute:
properties: properties:
requested_additional_capabilities: requested_additional_capabilities:
properties: properties:
requested_additional_capability_name: sample_flavor requested_additional_capability_name: sample_flavor
support_mandatory: true support_mandatory: true
target_performance_parameters: target_performance_parameters:
entry_schema: test entry_schema: test
virtual_memory: virtual_memory:
virtual_mem_size: 512 MB virtual_mem_size: 512 MB
virtual_cpu: virtual_cpu:
num_virtual_cpu: 1 num_virtual_cpu: 1
virtual_local_storage: virtual_local_storage:
- size_of_storage: 1 GB - size_of_storage: 1 GB
VDU_1: VDU_1:
type: tosca.nodes.nfv.Vdu.Compute type: tosca.nodes.nfv.Vdu.Compute
properties: properties:
name: VDU_1 name: VDU_1
description: VDU_1 description: VDU_1
vdu_profile: vdu_profile:
min_number_of_instances: 1 min_number_of_instances: 1
max_number_of_instances: 1 max_number_of_instances: 1
sw_image_data: sw_image_data:
name: sample_image name: sample_image
version: '1.0' version: '1.0'
checksum: checksum:
algorithm: sha-512 algorithm: sha-512
hash: 6513f21e44aa3da349f248188a44bc304a3653a04122d8fb4535423c8e1d14cd6a153f735bb0982e2161b5b5186106570c17a9e58b64dd39390617cd5a350f78 hash: 6513f21e44aa3da349f248188a44bc304a3653a04122d8fb4535423c8e1d14cd6a153f735bb0982e2161b5b5186106570c17a9e58b64dd39390617cd5a350f78
container_format: bare container_format: bare
disk_format: qcow2 disk_format: qcow2
min_disk: 0 GB min_disk: 0 GB
size: 1869 MB size: 1869 MB
capabilities: capabilities:
virtual_compute: virtual_compute:
properties: properties:
requested_additional_capabilities: requested_additional_capabilities:
properties: properties:
requested_additional_capability_name: sample_flavor requested_additional_capability_name: sample_flavor
support_mandatory: true support_mandatory: true
target_performance_parameters: target_performance_parameters:
entry_schema: test entry_schema: test
virtual_memory: virtual_memory:
virtual_mem_size: 512 MB virtual_mem_size: 512 MB
virtual_cpu: virtual_cpu:
num_virtual_cpu: 1 num_virtual_cpu: 1
virtual_local_storage: virtual_local_storage:
- size_of_storage: 1 GB - size_of_storage: 1 GB
VDU0_CP0: VDU0_CP0:
type: tosca.nodes.nfv.VduCp type: tosca.nodes.nfv.VduCp
properties: properties:
order: 0 order: 0
bitrate_requirement: 1 bitrate_requirement: 1
vnic_type: normal vnic_type: normal
layer_protocols: [ ipv4 ] layer_protocols: [ ipv4 ]
protocol: protocol:
- associated_layer_protocol: ipv4 - associated_layer_protocol: ipv4
address_data: address_data:
- address_type: ip_address - address_type: ip_address
l3_address_data: l3_address_data:
ip_address_assignment: true ip_address_assignment: true
floating_ip_activated: false floating_ip_activated: false
requirements: requirements:
- virtual_binding: VDU_0 - virtual_binding: VDU_0
- virtual_link: VDU_intnet0 - virtual_link: VDU_intnet0
VDU1_CP0: VDU1_CP0:
type: tosca.nodes.nfv.VduCp type: tosca.nodes.nfv.VduCp
properties: properties:
order: 0 order: 0
bitrate_requirement: 1 bitrate_requirement: 1
vnic_type: normal vnic_type: normal
layer_protocols: [ ipv4 ] layer_protocols: [ ipv4 ]
protocol: protocol:
- associated_layer_protocol: ipv4 - associated_layer_protocol: ipv4
address_data: address_data:
- address_type: ip_address - address_type: ip_address
l3_address_data: l3_address_data:
ip_address_assignment: true ip_address_assignment: true
floating_ip_activated: false floating_ip_activated: false
requirements: requirements:
- virtual_binding: VDU_1 - virtual_binding: VDU_1
- virtual_link: VDU_intnet0 - virtual_link: VDU_intnet0
VDU0_extCP0: VDU0_extCP0:
type: tosca.nodes.nfv.VnfExtCp type: tosca.nodes.nfv.VnfExtCp
properties: properties:
layer_protocols: [ ipv4 ] layer_protocols: [ ipv4 ]
requirements: requirements:
- internal_virtual_link: VDU_intnet0 - internal_virtual_link: VDU_intnet0
VDU1_extCP0: VDU1_extCP0:
type: tosca.nodes.nfv.VnfExtCp type: tosca.nodes.nfv.VnfExtCp
properties: properties:
layer_protocols: [ ipv4 ] layer_protocols: [ ipv4 ]
requirements: requirements:
- internal_virtual_link: VDU_intnet0 - internal_virtual_link: VDU_intnet0
VDU_intnet0: VDU_intnet0:
type: tosca.nodes.nfv.VnfVirtualLink type: tosca.nodes.nfv.VnfVirtualLink
properties: properties:
connectivity_type: connectivity_type:
layer_protocols: [ ipv4 ] layer_protocols: [ ipv4 ]
vl_profile: vl_profile:
max_bitrate_requirements: max_bitrate_requirements:
root: 1000000 root: 1000000
min_bitrate_requirements: min_bitrate_requirements:
root: 100000 root: 100000
virtual_link_protocol_data: virtual_link_protocol_data:
- associated_layer_protocol: ipv4 - associated_layer_protocol: ipv4
l2_protocol_data: l2_protocol_data:
network_type: vxlan network_type: vxlan
l3_protocol_data: l3_protocol_data:
ip_version: ipv4 ip_version: ipv4
cidr: '192.168.0.0/24' cidr: '192.168.0.0/24'
dhcp_enabled: true dhcp_enabled: true
RT_extCP: RT_extCP:
type: tosca.nodes.nfv.VnfExtCp type: tosca.nodes.nfv.VnfExtCp
properties: properties:
layer_protocols: [ ipv4 ] layer_protocols: [ ipv4 ]
requirements: requirements:
- internal_virtual_link: VDU_intnet0 - internal_virtual_link: VDU_intnet0
VDU_extvCP: VDU_extvCP:
type: tosca.nodes.nfv.VnfExtCp type: tosca.nodes.nfv.VnfExtCp
properties: properties:
layer_protocols: [ ipv4 ] layer_protocols: [ ipv4 ]
requirements: requirements:
- internal_virtual_link: VDU_intnet0 - internal_virtual_link: VDU_intnet0
groups: groups:
VDU_AntiAffinityGroup: VDU_AntiAffinityGroup:
type: tosca.groups.nfv.PlacementGroup type: tosca.groups.nfv.PlacementGroup
members: [ VDU_0, VDU_1 ] members: [ VDU_0, VDU_1 ]
policies: policies:
- VDU_placement_policy: - VDU_placement_policy:
type: tosca.policies.nfv.AntiAffinityRule type: tosca.policies.nfv.AntiAffinityRule
targets: [ VDU_AntiAffinityGroup ] targets: [ VDU_AntiAffinityGroup ]
properties: properties:
scope: nfvi_node scope: nfvi_node

View File

@ -1,391 +1,391 @@
tosca_definitions_version: tosca_simple_yaml_1_2 tosca_definitions_version: tosca_simple_yaml_1_2
description: Sample VNF default DF description: Sample VNF default DF
imports: imports:
- etsi_nfv_sol001_common_types.yaml - etsi_nfv_sol001_common_types.yaml
- etsi_nfv_sol001_vnfd_types.yaml - etsi_nfv_sol001_vnfd_types.yaml
- Common.yaml - Common.yaml
topology_template: topology_template:
inputs: inputs:
descriptor_id: descriptor_id:
type: string type: string
descriptor_version: descriptor_version:
type: string type: string
provider: provider:
type: string type: string
product_name: product_name:
type: string type: string
software_version: software_version:
type: string type: string
vnfm_info: vnfm_info:
type: list type: list
entry_schema: entry_schema:
type: string type: string
flavour_id: flavour_id:
type: string type: string
flavour_description: flavour_description:
type: string type: string
configurable_properties: configurable_properties:
type: map type: map
substitution_mappings: substitution_mappings:
node_type: Sample.VNF.Node node_type: Sample.VNF.Node
properties: properties:
flavour_id: scalable flavour_id: scalable
requirements: requirements:
VDU0_extnet: [ VDU0_CP1, external_virtual_link ] VDU0_extnet: [ VDU0_CP1, external_virtual_link ]
VDU1_extnet: [ VDU1_CP1, external_virtual_link ] VDU1_extnet: [ VDU1_CP1, external_virtual_link ]
VDU2_extnet: [ VDU2_CP1, external_virtual_link ] VDU2_extnet: [ VDU2_CP1, external_virtual_link ]
node_templates: node_templates:
VNF: VNF:
type: Sample.VNF.Node type: Sample.VNF.Node
properties: properties:
flavour_description: 'scalable' flavour_description: 'scalable'
configurable_properties: configurable_properties:
is_autoscale_enabled: false is_autoscale_enabled: false
is_autoheal_enabled: false is_autoheal_enabled: false
vnfm_info: vnfm_info:
- Tacker - Tacker
interfaces: interfaces:
Vnflcm: Vnflcm:
instantiate: [] instantiate: []
instantiate_start: [] instantiate_start: []
instantiate_end: [] instantiate_end: []
scale: [] scale: []
scale_start: [] scale_start: []
scale_end: [] scale_end: []
heal: [] heal: []
heal_start: [] heal_start: []
heal_end: [] heal_end: []
terminate: [] terminate: []
terminate_start: [] terminate_start: []
terminate_end: [] terminate_end: []
modify_information: [] modify_information: []
modify_information_start: [] modify_information_start: []
modify_information_end: [] modify_information_end: []
VDU_0: VDU_0:
type: tosca.nodes.nfv.Vdu.Compute type: tosca.nodes.nfv.Vdu.Compute
properties: properties:
name: VDU_0 name: VDU_0
description: VDU_0 description: VDU_0
vdu_profile: vdu_profile:
min_number_of_instances: 1 min_number_of_instances: 1
max_number_of_instances: 1 max_number_of_instances: 1
sw_image_data: sw_image_data:
name: sample_image name: sample_image
version: '1.0' version: '1.0'
checksum: checksum:
algorithm: sha-512 algorithm: sha-512
hash: 6513f21e44aa3da349f248188a44bc304a3653a04122d8fb4535423c8e1d14cd6a153f735bb0982e2161b5b5186106570c17a9e58b64dd39390617cd5a350f78 hash: 6513f21e44aa3da349f248188a44bc304a3653a04122d8fb4535423c8e1d14cd6a153f735bb0982e2161b5b5186106570c17a9e58b64dd39390617cd5a350f78
container_format: bare container_format: bare
disk_format: qcow2 disk_format: qcow2
min_disk: 0 GB min_disk: 0 GB
size: 1869 MB size: 1869 MB
capabilities: capabilities:
virtual_compute: virtual_compute:
properties: properties:
requested_additional_capabilities: requested_additional_capabilities:
properties: properties:
requested_additional_capability_name: sample_flavor requested_additional_capability_name: sample_flavor
support_mandatory: true support_mandatory: true
target_performance_parameters: target_performance_parameters:
entry_schema: test entry_schema: test
virtual_memory: virtual_memory:
virtual_mem_size: 512 MB virtual_mem_size: 512 MB
virtual_cpu: virtual_cpu:
num_virtual_cpu: 1 num_virtual_cpu: 1
virtual_local_storage: virtual_local_storage:
- size_of_storage: 1 GB - size_of_storage: 1 GB
VDU_1: VDU_1:
type: tosca.nodes.nfv.Vdu.Compute type: tosca.nodes.nfv.Vdu.Compute
properties: properties:
name: VDU_1 name: VDU_1
description: VDU_1 description: VDU_1
vdu_profile: vdu_profile:
min_number_of_instances: 1 min_number_of_instances: 1
max_number_of_instances: 1 max_number_of_instances: 1
sw_image_data: sw_image_data:
name: sample_image name: sample_image
version: '1.0' version: '1.0'
checksum: checksum:
algorithm: sha-512 algorithm: sha-512
hash: 6513f21e44aa3da349f248188a44bc304a3653a04122d8fb4535423c8e1d14cd6a153f735bb0982e2161b5b5186106570c17a9e58b64dd39390617cd5a350f78 hash: 6513f21e44aa3da349f248188a44bc304a3653a04122d8fb4535423c8e1d14cd6a153f735bb0982e2161b5b5186106570c17a9e58b64dd39390617cd5a350f78
container_format: bare container_format: bare
disk_format: qcow2 disk_format: qcow2
min_disk: 0 GB min_disk: 0 GB
size: 1869 MB size: 1869 MB
capabilities: capabilities:
virtual_compute: virtual_compute:
properties: properties:
requested_additional_capabilities: requested_additional_capabilities:
properties: properties:
requested_additional_capability_name: sample_flavor requested_additional_capability_name: sample_flavor
support_mandatory: true support_mandatory: true
target_performance_parameters: target_performance_parameters:
entry_schema: test entry_schema: test
virtual_memory: virtual_memory:
virtual_mem_size: 512 MB virtual_mem_size: 512 MB
virtual_cpu: virtual_cpu:
num_virtual_cpu: 1 num_virtual_cpu: 1
virtual_local_storage: virtual_local_storage:
- size_of_storage: 1 GB - size_of_storage: 1 GB
VDU_2: VDU_2:
type: tosca.nodes.nfv.Vdu.Compute type: tosca.nodes.nfv.Vdu.Compute
properties: properties:
name: VDU_2 name: VDU_2
description: VDU_2 description: VDU_2
vdu_profile: vdu_profile:
min_number_of_instances: 0 min_number_of_instances: 0
max_number_of_instances: 1 max_number_of_instances: 1
sw_image_data: sw_image_data:
name: sample_image name: sample_image
version: '1.0' version: '1.0'
checksum: checksum:
algorithm: sha-512 algorithm: sha-512
hash: 6513f21e44aa3da349f248188a44bc304a3653a04122d8fb4535423c8e1d14cd6a153f735bb0982e2161b5b5186106570c17a9e58b64dd39390617cd5a350f78 hash: 6513f21e44aa3da349f248188a44bc304a3653a04122d8fb4535423c8e1d14cd6a153f735bb0982e2161b5b5186106570c17a9e58b64dd39390617cd5a350f78
container_format: bare container_format: bare
disk_format: qcow2 disk_format: qcow2
min_disk: 0 GB min_disk: 0 GB
size: 1869 MB size: 1869 MB
capabilities: capabilities:
virtual_compute: virtual_compute:
properties: properties:
requested_additional_capabilities: requested_additional_capabilities:
properties: properties:
requested_additional_capability_name: sample_flavor requested_additional_capability_name: sample_flavor
support_mandatory: true support_mandatory: true
target_performance_parameters: target_performance_parameters:
entry_schema: test entry_schema: test
virtual_memory: virtual_memory:
virtual_mem_size: 512 MB virtual_mem_size: 512 MB
virtual_cpu: virtual_cpu:
num_virtual_cpu: 1 num_virtual_cpu: 1
virtual_local_storage: virtual_local_storage:
- size_of_storage: 1 GB - size_of_storage: 1 GB
VDU0_CP0: VDU0_CP0:
type: tosca.nodes.nfv.VduCp type: tosca.nodes.nfv.VduCp
properties: properties:
order: 0 order: 0
bitrate_requirement: 1 bitrate_requirement: 1
vnic_type: normal vnic_type: normal
layer_protocols: [ ipv4 ] layer_protocols: [ ipv4 ]
protocol: protocol:
- associated_layer_protocol: ipv4 - associated_layer_protocol: ipv4
address_data: address_data:
- address_type: ip_address - address_type: ip_address
l3_address_data: l3_address_data:
ip_address_assignment: true ip_address_assignment: true
floating_ip_activated: false floating_ip_activated: false
requirements: requirements:
- virtual_binding: VDU_0 - virtual_binding: VDU_0
- virtual_link: int_net - virtual_link: int_net
VDU0_CP1: VDU0_CP1:
type: tosca.nodes.nfv.VduCp type: tosca.nodes.nfv.VduCp
properties: properties:
order: 0 order: 0
bitrate_requirement: 1 bitrate_requirement: 1
vnic_type: normal vnic_type: normal
layer_protocols: [ ipv4 ] layer_protocols: [ ipv4 ]
protocol: protocol:
- associated_layer_protocol: ipv4 - associated_layer_protocol: ipv4
address_data: address_data:
- address_type: ip_address - address_type: ip_address
l3_address_data: l3_address_data:
ip_address_assignment: true ip_address_assignment: true
floating_ip_activated: false floating_ip_activated: false
requirements: requirements:
- virtual_binding: VDU_0 - virtual_binding: VDU_0
VDU1_CP0: VDU1_CP0:
type: tosca.nodes.nfv.VduCp type: tosca.nodes.nfv.VduCp
properties: properties:
order: 0 order: 0
bitrate_requirement: 1 bitrate_requirement: 1
vnic_type: normal vnic_type: normal
layer_protocols: [ ipv4 ] layer_protocols: [ ipv4 ]
protocol: protocol:
- associated_layer_protocol: ipv4 - associated_layer_protocol: ipv4
address_data: address_data:
- address_type: ip_address - address_type: ip_address
l3_address_data: l3_address_data:
ip_address_assignment: true ip_address_assignment: true
floating_ip_activated: false floating_ip_activated: false
requirements: requirements:
- virtual_binding: VDU_1 - virtual_binding: VDU_1
- virtual_link: int_net - virtual_link: int_net
VDU1_CP1: VDU1_CP1:
type: tosca.nodes.nfv.VduCp type: tosca.nodes.nfv.VduCp
properties: properties:
order: 0 order: 0
bitrate_requirement: 1 bitrate_requirement: 1
vnic_type: normal vnic_type: normal
layer_protocols: [ ipv4 ] layer_protocols: [ ipv4 ]
protocol: protocol:
- associated_layer_protocol: ipv4 - associated_layer_protocol: ipv4
address_data: address_data:
- address_type: ip_address - address_type: ip_address
l3_address_data: l3_address_data:
ip_address_assignment: true ip_address_assignment: true
floating_ip_activated: false floating_ip_activated: false
requirements: requirements:
- virtual_binding: VDU_1 - virtual_binding: VDU_1
VDU2_CP0: VDU2_CP0:
type: tosca.nodes.nfv.VduCp type: tosca.nodes.nfv.VduCp
properties: properties:
order: 0 order: 0
bitrate_requirement: 1 bitrate_requirement: 1
vnic_type: normal vnic_type: normal
layer_protocols: [ ipv4 ] layer_protocols: [ ipv4 ]
protocol: protocol:
- associated_layer_protocol: ipv4 - associated_layer_protocol: ipv4
address_data: address_data:
- address_type: ip_address - address_type: ip_address
l3_address_data: l3_address_data:
ip_address_assignment: true ip_address_assignment: true
floating_ip_activated: false floating_ip_activated: false
requirements: requirements:
- virtual_binding: VDU_2 - virtual_binding: VDU_2
- virtual_link: int_net - virtual_link: int_net
VDU2_CP1: VDU2_CP1:
type: tosca.nodes.nfv.VduCp type: tosca.nodes.nfv.VduCp
properties: properties:
order: 0 order: 0
bitrate_requirement: 1 bitrate_requirement: 1
vnic_type: normal vnic_type: normal
layer_protocols: [ ipv4 ] layer_protocols: [ ipv4 ]
protocol: protocol:
- associated_layer_protocol: ipv4 - associated_layer_protocol: ipv4
address_data: address_data:
- address_type: ip_address - address_type: ip_address
l3_address_data: l3_address_data:
ip_address_assignment: true ip_address_assignment: true
floating_ip_activated: false floating_ip_activated: false
requirements: requirements:
- virtual_binding: VDU_2 - virtual_binding: VDU_2
int_net: int_net:
type: tosca.nodes.nfv.VnfVirtualLink type: tosca.nodes.nfv.VnfVirtualLink
properties: properties:
connectivity_type: connectivity_type:
layer_protocols: [ ipv4 ] layer_protocols: [ ipv4 ]
vl_profile: vl_profile:
max_bitrate_requirements: max_bitrate_requirements:
root: 1000000 root: 1000000
min_bitrate_requirements: min_bitrate_requirements:
root: 100000 root: 100000
virtual_link_protocol_data: virtual_link_protocol_data:
- associated_layer_protocol: ipv4 - associated_layer_protocol: ipv4
l2_protocol_data: l2_protocol_data:
network_type: vxlan network_type: vxlan
l3_protocol_data: l3_protocol_data:
ip_version: ipv4 ip_version: ipv4
cidr: '192.168.1.0/24' cidr: '192.168.1.0/24'
groups: groups:
VDU_AntiAffinityGroup: VDU_AntiAffinityGroup:
type: tosca.groups.nfv.PlacementGroup type: tosca.groups.nfv.PlacementGroup
members: [ VDU_0, VDU_1, VDU_2 ] members: [ VDU_0, VDU_1, VDU_2 ]
policies: policies:
- VDU_placement_policy: - VDU_placement_policy:
type: tosca.policies.nfv.AntiAffinityRule type: tosca.policies.nfv.AntiAffinityRule
targets: [ VDU_AntiAffinityGroup ] targets: [ VDU_AntiAffinityGroup ]
properties: properties:
scope: nfvi_node scope: nfvi_node
- vdu_scale: - vdu_scale:
type: tosca.policies.nfv.ScalingAspects type: tosca.policies.nfv.ScalingAspects
properties: properties:
aspects: aspects:
VDU_2: VDU_2:
name: VDU_2 name: VDU_2
description: VDU_2 description: VDU_2
max_scale_level: 1 max_scale_level: 1
step_deltas: step_deltas:
- delta_1 - delta_1
- vdu_0_initial_delta: - vdu_0_initial_delta:
type: tosca.policies.nfv.VduInitialDelta type: tosca.policies.nfv.VduInitialDelta
properties: properties:
initial_delta: initial_delta:
number_of_instances: 1 number_of_instances: 1
targets: [ VDU_0 ] targets: [ VDU_0 ]
- vdu_1_initial_delta: - vdu_1_initial_delta:
type: tosca.policies.nfv.VduInitialDelta type: tosca.policies.nfv.VduInitialDelta
properties: properties:
initial_delta: initial_delta:
number_of_instances: 1 number_of_instances: 1
targets: [ VDU_1 ] targets: [ VDU_1 ]
- vdu_2_initial_delta: - vdu_2_initial_delta:
type: tosca.policies.nfv.VduInitialDelta type: tosca.policies.nfv.VduInitialDelta
properties: properties:
initial_delta: initial_delta:
number_of_instances: 0 number_of_instances: 0
targets: [ VDU_2 ] targets: [ VDU_2 ]
- vdu_2_scaling_aspect_deltas: - vdu_2_scaling_aspect_deltas:
type: tosca.policies.nfv.VduScalingAspectDeltas type: tosca.policies.nfv.VduScalingAspectDeltas
properties: properties:
aspect: VDU_2 aspect: VDU_2
deltas: deltas:
delta_1: delta_1:
number_of_instances: 1 number_of_instances: 1
targets: [ VDU_2 ] targets: [ VDU_2 ]
- instantiation_levels: - instantiation_levels:
type: tosca.policies.nfv.InstantiationLevels type: tosca.policies.nfv.InstantiationLevels
properties: properties:
levels: levels:
r-node-min: r-node-min:
description: vdu-min structure description: vdu-min structure
scale_info: scale_info:
VDU_2: VDU_2:
scale_level: 0 scale_level: 0
r-node-max: r-node-max:
description: vdu-max structure description: vdu-max structure
scale_info: scale_info:
VDU_2: VDU_2:
scale_level: 1 scale_level: 1
- vdu_0_instantiation_levels: - vdu_0_instantiation_levels:
type: tosca.policies.nfv.VduInstantiationLevels type: tosca.policies.nfv.VduInstantiationLevels
properties: properties:
levels: levels:
r-node-min: r-node-min:
number_of_instances: 1 number_of_instances: 1
r-node-max: r-node-max:
number_of_instances: 1 number_of_instances: 1
targets: [ VDU_0 ] targets: [ VDU_0 ]
- vdu_1_instantiation_levels: - vdu_1_instantiation_levels:
type: tosca.policies.nfv.VduInstantiationLevels type: tosca.policies.nfv.VduInstantiationLevels
properties: properties:
levels: levels:
r-node-min: r-node-min:
number_of_instances: 1 number_of_instances: 1
r-node-max: r-node-max:
number_of_instances: 1 number_of_instances: 1
targets: [ VDU_1 ] targets: [ VDU_1 ]
- vdu_2_instantiation_levels: - vdu_2_instantiation_levels:
type: tosca.policies.nfv.VduInstantiationLevels type: tosca.policies.nfv.VduInstantiationLevels
properties: properties:
levels: levels:
r-node-min: r-node-min:
number_of_instances: 0 number_of_instances: 0
r-node-max: r-node-max:
number_of_instances: 1 number_of_instances: 1
targets: [ VDU_2 ] targets: [ VDU_2 ]

View File

@ -1,35 +1,35 @@
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
# a copy of the License at # a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import tacker.vnfm.lcm_user_data.utils as UserDataUtil import tacker.vnfm.lcm_user_data.utils as UserDataUtil
from tacker.vnfm.lcm_user_data.abstract_user_data import AbstractUserData from tacker.vnfm.lcm_user_data.abstract_user_data import AbstractUserData
class SampleUserData(AbstractUserData): class SampleUserData(AbstractUserData):
@staticmethod @staticmethod
def instantiate(base_hot_dict=None, def instantiate(base_hot_dict=None,
vnfd_dict=None, vnfd_dict=None,
inst_req_info=None, inst_req_info=None,
grant_info=None): grant_info=None):
api_param = UserDataUtil.get_diff_base_hot_param_from_api( api_param = UserDataUtil.get_diff_base_hot_param_from_api(
base_hot_dict, inst_req_info) base_hot_dict, inst_req_info)
initial_param_dict = \ initial_param_dict = \
UserDataUtil.create_initial_param_server_port_dict(base_hot_dict) UserDataUtil.create_initial_param_server_port_dict(base_hot_dict)
vdu_flavor_dict = \ vdu_flavor_dict = \
UserDataUtil.create_vdu_flavor_capability_name_dict(vnfd_dict) UserDataUtil.create_vdu_flavor_capability_name_dict(vnfd_dict)
vdu_image_dict = UserDataUtil.create_sw_image_dict(vnfd_dict) vdu_image_dict = UserDataUtil.create_sw_image_dict(vnfd_dict)
cpd_vl_dict = UserDataUtil.create_network_dict(inst_req_info, cpd_vl_dict = UserDataUtil.create_network_dict(inst_req_info,
initial_param_dict) initial_param_dict)
final_param_dict = UserDataUtil.create_final_param_dict( final_param_dict = UserDataUtil.create_final_param_dict(
initial_param_dict, vdu_flavor_dict, vdu_image_dict, cpd_vl_dict) initial_param_dict, vdu_flavor_dict, vdu_image_dict, cpd_vl_dict)
return {**final_param_dict, **api_param} return {**final_param_dict, **api_param}

View File

@ -1,370 +1,370 @@
# Copyright (C) 2023 Fujitsu # Copyright (C) 2023 Fujitsu
# All Rights Reserved. # All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
# a copy of the License at # a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
""" """
Auth for External Server OAuth2.0 authentication Auth for External Server OAuth2.0 authentication
""" """
import time import time
import uuid import uuid
import jwt.utils import jwt.utils
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
import requests.auth import requests.auth
from keystoneauth1 import exceptions as ksa_exceptions from keystoneauth1 import exceptions as ksa_exceptions
from keystoneauth1.loading import session as session_loading from keystoneauth1.loading import session as session_loading
from tacker._i18n import _ from tacker._i18n import _
from tacker.common.exceptions import TackerException from tacker.common.exceptions import TackerException
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
_EXT_AUTH_CONFIG_GROUP_NAME = 'ext_oauth2_auth' _EXT_AUTH_CONFIG_GROUP_NAME = 'ext_oauth2_auth'
_EXTERNAL_AUTH2_OPTS = [ _EXTERNAL_AUTH2_OPTS = [
cfg.BoolOpt('use_ext_oauth2_auth', default=False, cfg.BoolOpt('use_ext_oauth2_auth', default=False,
help='Set True to use external Oauth2.0 auth server.'), help='Set True to use external Oauth2.0 auth server.'),
cfg.StrOpt('token_endpoint', cfg.StrOpt('token_endpoint',
help='The endpoint for access token API.'), help='The endpoint for access token API.'),
cfg.StrOpt('scope', cfg.StrOpt('scope',
help='The scope that the access token can access.'), help='The scope that the access token can access.'),
] ]
_EXTERNAL_AUTH2_KEYSTONE_MIDDLEWARE_OPTS = [ _EXTERNAL_AUTH2_KEYSTONE_MIDDLEWARE_OPTS = [
cfg.StrOpt('certfile', cfg.StrOpt('certfile',
help='Required if identity server requires client ' help='Required if identity server requires client '
'certificate.'), 'certificate.'),
cfg.StrOpt('keyfile', cfg.StrOpt('keyfile',
help='Required if identity server requires client ' help='Required if identity server requires client '
'private key.'), 'private key.'),
cfg.StrOpt('cafile', cfg.StrOpt('cafile',
help='A PEM encoded Certificate Authority to use when ' help='A PEM encoded Certificate Authority to use when '
'verifying HTTPs connections. Defaults to system CAs.'), 'verifying HTTPs connections. Defaults to system CAs.'),
cfg.BoolOpt('insecure', default=False, help='Verify HTTPS connections.'), cfg.BoolOpt('insecure', default=False, help='Verify HTTPS connections.'),
cfg.IntOpt('http_connect_timeout', cfg.IntOpt('http_connect_timeout',
help='Request timeout value for communicating with Identity ' help='Request timeout value for communicating with Identity '
'API server.'), 'API server.'),
cfg.StrOpt('audience', cfg.StrOpt('audience',
help='The Audience should be the URL of the Authorization ' help='The Audience should be the URL of the Authorization '
"Server's Token Endpoint. The Authorization Server will " "Server's Token Endpoint. The Authorization Server will "
'verify that it is an intended audience for the token.'), 'verify that it is an intended audience for the token.'),
cfg.StrOpt('auth_method', cfg.StrOpt('auth_method',
default='client_secret_basic', default='client_secret_basic',
choices=('client_secret_basic', 'client_secret_post', choices=('client_secret_basic', 'client_secret_post',
'tls_client_auth', 'private_key_jwt', 'tls_client_auth', 'private_key_jwt',
'client_secret_jwt'), 'client_secret_jwt'),
help='The auth_method must use the authentication method ' help='The auth_method must use the authentication method '
'specified by the Authorization Server.'), 'specified by the Authorization Server.'),
cfg.StrOpt('client_id', cfg.StrOpt('client_id',
help='The OAuth 2.0 Client Identifier valid at the ' help='The OAuth 2.0 Client Identifier valid at the '
'Authorization Server.'), 'Authorization Server.'),
cfg.StrOpt('client_secret', cfg.StrOpt('client_secret',
help='The OAuth 2.0 client secret. When the auth_method is ' help='The OAuth 2.0 client secret. When the auth_method is '
'client_secret_basic, client_secret_post, or ' 'client_secret_basic, client_secret_post, or '
'client_secret_jwt, the value is used, and otherwise the ' 'client_secret_jwt, the value is used, and otherwise the '
'value is ignored.'), 'value is ignored.'),
cfg.StrOpt('jwt_key_file', cfg.StrOpt('jwt_key_file',
help='The jwt_key_file must use the certificate key file which ' help='The jwt_key_file must use the certificate key file which '
'has been registered with the Authorization Server. ' 'has been registered with the Authorization Server. '
'When the auth_method is private_key_jwt, the value is ' 'When the auth_method is private_key_jwt, the value is '
'used, and otherwise the value is ignored.'), 'used, and otherwise the value is ignored.'),
cfg.StrOpt('jwt_algorithm', cfg.StrOpt('jwt_algorithm',
help='The jwt_algorithm must use the algorithm specified by ' help='The jwt_algorithm must use the algorithm specified by '
'the Authorization Server. When the auth_method is ' 'the Authorization Server. When the auth_method is '
'client_secret_jwt, this value is often set to HS256,' 'client_secret_jwt, this value is often set to HS256,'
'when the auth_method is private_key_jwt, the value is ' 'when the auth_method is private_key_jwt, the value is '
'often set to RS256, and otherwise the value is ignored.'), 'often set to RS256, and otherwise the value is ignored.'),
cfg.IntOpt('jwt_bearer_time_out', default=3600, cfg.IntOpt('jwt_bearer_time_out', default=3600,
help='This value is used to calculate the expiration time. If ' help='This value is used to calculate the expiration time. If '
'after the expiration time, the access token cannot be ' 'after the expiration time, the access token cannot be '
'accepted. When the auth_method is client_secret_jwt or ' 'accepted. When the auth_method is client_secret_jwt or '
'private_key_jwt, the value is used, and otherwise the ' 'private_key_jwt, the value is used, and otherwise the '
'value is ignored.'), 'value is ignored.'),
] ]
def config_opts(): def config_opts():
return [(_EXT_AUTH_CONFIG_GROUP_NAME, return [(_EXT_AUTH_CONFIG_GROUP_NAME,
_EXTERNAL_AUTH2_OPTS + _EXTERNAL_AUTH2_KEYSTONE_MIDDLEWARE_OPTS)] _EXTERNAL_AUTH2_OPTS + _EXTERNAL_AUTH2_KEYSTONE_MIDDLEWARE_OPTS)]
cfg.CONF.register_opts(_EXTERNAL_AUTH2_OPTS, cfg.CONF.register_opts(_EXTERNAL_AUTH2_OPTS,
group=_EXT_AUTH_CONFIG_GROUP_NAME) group=_EXT_AUTH_CONFIG_GROUP_NAME)
class ExtOAuth2Auth(object): class ExtOAuth2Auth(object):
"""Construct an Auth to fetch an access token for HTTP access.""" """Construct an Auth to fetch an access token for HTTP access."""
def __init__(self): def __init__(self):
self._conf = cfg.CONF.ext_oauth2_auth self._conf = cfg.CONF.ext_oauth2_auth
# Check whether the configuration parameter has been registered # Check whether the configuration parameter has been registered
if 'auth_method' not in self._conf: if 'auth_method' not in self._conf:
LOG.debug('The relevant config parameters are not registered ' LOG.debug('The relevant config parameters are not registered '
'and need to be registered before they can be used.') 'and need to be registered before they can be used.')
cfg.CONF.register_opts(_EXTERNAL_AUTH2_KEYSTONE_MIDDLEWARE_OPTS, cfg.CONF.register_opts(_EXTERNAL_AUTH2_KEYSTONE_MIDDLEWARE_OPTS,
group=_EXT_AUTH_CONFIG_GROUP_NAME) group=_EXT_AUTH_CONFIG_GROUP_NAME)
self.token_endpoint = self._get_config_option( self.token_endpoint = self._get_config_option(
'token_endpoint', is_required=True) 'token_endpoint', is_required=True)
self.auth_method = self._get_config_option( self.auth_method = self._get_config_option(
'auth_method', is_required=True) 'auth_method', is_required=True)
self.client_id = self._get_config_option( self.client_id = self._get_config_option(
'client_id', is_required=True) 'client_id', is_required=True)
self.scope = self._get_config_option( self.scope = self._get_config_option(
'scope', is_required=True) 'scope', is_required=True)
self.access_token = None self.access_token = None
def _get_config_option(self, key, is_required): def _get_config_option(self, key, is_required):
"""Read the value from config file by the config key.""" """Read the value from config file by the config key."""
try: try:
value = getattr(self._conf, key) value = getattr(self._conf, key)
except cfg.NoSuchOptError: except cfg.NoSuchOptError:
value = None value = None
if not value: if not value:
if is_required: if is_required:
LOG.error('The value is required for option %s ' LOG.error('The value is required for option %s '
'in group [%s]' % (key, 'in group [%s]' % (key,
_EXT_AUTH_CONFIG_GROUP_NAME)) _EXT_AUTH_CONFIG_GROUP_NAME))
raise TackerException( raise TackerException(
_('Configuration error. The parameter ' _('Configuration error. The parameter '
'is not set for "%s" in group [%s].') % ( 'is not set for "%s" in group [%s].') % (
key, _EXT_AUTH_CONFIG_GROUP_NAME)) key, _EXT_AUTH_CONFIG_GROUP_NAME))
else: else:
return None return None
else: else:
return value return value
def create_session(self, **kwargs): def create_session(self, **kwargs):
"""Create session for HTTP access.""" """Create session for HTTP access."""
kwargs.setdefault('cert', self._get_config_option( kwargs.setdefault('cert', self._get_config_option(
'certfile', is_required=False)) 'certfile', is_required=False))
kwargs.setdefault('key', self._get_config_option( kwargs.setdefault('key', self._get_config_option(
'keyfile', is_required=False)) 'keyfile', is_required=False))
kwargs.setdefault('cacert', self._get_config_option( kwargs.setdefault('cacert', self._get_config_option(
'cafile', is_required=False)) 'cafile', is_required=False))
kwargs.setdefault('insecure', self._get_config_option( kwargs.setdefault('insecure', self._get_config_option(
'insecure', is_required=False)) 'insecure', is_required=False))
kwargs.setdefault('timeout', self._get_config_option( kwargs.setdefault('timeout', self._get_config_option(
'http_connect_timeout', is_required=False)) 'http_connect_timeout', is_required=False))
kwargs.setdefault('user_agent', 'tacker service') kwargs.setdefault('user_agent', 'tacker service')
sess = session_loading.Session().load_from_options(**kwargs) sess = session_loading.Session().load_from_options(**kwargs)
sess.auth = self sess.auth = self
return sess return sess
def get_connection_params(self, session, **kwargs): def get_connection_params(self, session, **kwargs):
"""Get connection params for HTTP access.""" """Get connection params for HTTP access."""
return {} return {}
def invalidate(self): def invalidate(self):
"""Invalidate the current authentication data.""" """Invalidate the current authentication data."""
self.access_token = None self.access_token = None
return True return True
def _get_token_by_client_secret_basic(self, session): def _get_token_by_client_secret_basic(self, session):
"""Access the access token API. """Access the access token API.
Access the access token API to get an access token by Access the access token API to get an access token by
the auth method 'client_secret_basic'. the auth method 'client_secret_basic'.
""" """
para = { para = {
'scope': self.scope, 'scope': self.scope,
'grant_type': 'client_credentials' 'grant_type': 'client_credentials'
} }
auth = requests.auth.HTTPBasicAuth( auth = requests.auth.HTTPBasicAuth(
self.client_id, self._get_config_option( self.client_id, self._get_config_option(
'client_secret', is_required=True)) 'client_secret', is_required=True))
http_response = session.request( http_response = session.request(
self.token_endpoint, self.token_endpoint,
'POST', 'POST',
authenticated=False, authenticated=False,
data=para, data=para,
requests_auth=auth) requests_auth=auth)
return http_response return http_response
def _get_token_by_client_secret_post(self, session): def _get_token_by_client_secret_post(self, session):
"""Access the access token API. """Access the access token API.
Access the access token API to get an access token by Access the access token API to get an access token by
the auth method 'client_secret_post'. the auth method 'client_secret_post'.
""" """
para = { para = {
'client_id': self.client_id, 'client_id': self.client_id,
'client_secret': self._get_config_option( 'client_secret': self._get_config_option(
'client_secret', is_required=True), 'client_secret', is_required=True),
'scope': self.scope, 'scope': self.scope,
'grant_type': 'client_credentials' 'grant_type': 'client_credentials'
} }
http_response = session.request( http_response = session.request(
self.token_endpoint, self.token_endpoint,
'POST', 'POST',
authenticated=False, authenticated=False,
data=para) data=para)
return http_response return http_response
def _get_token_by_tls_client_auth(self, session): def _get_token_by_tls_client_auth(self, session):
"""Access the access token API. """Access the access token API.
Access the access token API to get an access token by Access the access token API to get an access token by
the auth method 'tls_client_auth'. the auth method 'tls_client_auth'.
""" """
para = { para = {
'client_id': self.client_id, 'client_id': self.client_id,
'scope': self.scope, 'scope': self.scope,
'grant_type': 'client_credentials' 'grant_type': 'client_credentials'
} }
http_response = session.request( http_response = session.request(
self.token_endpoint, self.token_endpoint,
'POST', 'POST',
authenticated=False, authenticated=False,
data=para) data=para)
return http_response return http_response
def _get_token_by_private_key_jwt(self, session): def _get_token_by_private_key_jwt(self, session):
"""Access the access token API. """Access the access token API.
Access the access token API to get an access token by Access the access token API to get an access token by
the auth method 'private_key_jwt'. the auth method 'private_key_jwt'.
""" """
jwt_key_file = self._get_config_option( jwt_key_file = self._get_config_option(
'jwt_key_file', is_required=True) 'jwt_key_file', is_required=True)
with open(jwt_key_file, 'r') as jwt_file: with open(jwt_key_file, 'r') as jwt_file:
jwt_key = jwt_file.read() jwt_key = jwt_file.read()
ita = round(time.time()) ita = round(time.time())
exp = ita + self._get_config_option( exp = ita + self._get_config_option(
'jwt_bearer_time_out', is_required=True) 'jwt_bearer_time_out', is_required=True)
alg = self._get_config_option('jwt_algorithm', is_required=True) alg = self._get_config_option('jwt_algorithm', is_required=True)
client_assertion = jwt.encode( client_assertion = jwt.encode(
payload={ payload={
'jti': str(uuid.uuid4()), 'jti': str(uuid.uuid4()),
'iat': str(ita), 'iat': str(ita),
'exp': str(exp), 'exp': str(exp),
'iss': self.client_id, 'iss': self.client_id,
'sub': self.client_id, 'sub': self.client_id,
'aud': self._get_config_option('audience', is_required=True)}, 'aud': self._get_config_option('audience', is_required=True)},
headers={ headers={
'typ': 'JWT', 'typ': 'JWT',
'alg': alg}, 'alg': alg},
key=jwt_key, key=jwt_key,
algorithm=alg) algorithm=alg)
para = { para = {
'client_id': self.client_id, 'client_id': self.client_id,
'client_assertion_type': 'client_assertion_type':
'urn:ietf:params:oauth:client-assertion-type:jwt-bearer', 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
'client_assertion': client_assertion, 'client_assertion': client_assertion,
'scope': self.scope, 'scope': self.scope,
'grant_type': 'client_credentials' 'grant_type': 'client_credentials'
} }
http_response = session.request( http_response = session.request(
self.token_endpoint, self.token_endpoint,
'POST', 'POST',
authenticated=False, authenticated=False,
data=para) data=para)
return http_response return http_response
def _get_token_by_client_secret_jwt(self, session): def _get_token_by_client_secret_jwt(self, session):
"""Access the access token API. """Access the access token API.
Access the access token API to get an access token by Access the access token API to get an access token by
the auth method 'client_secret_jwt'. the auth method 'client_secret_jwt'.
""" """
ita = round(time.time()) ita = round(time.time())
exp = ita + self._get_config_option( exp = ita + self._get_config_option(
'jwt_bearer_time_out', is_required=True) 'jwt_bearer_time_out', is_required=True)
alg = self._get_config_option('jwt_algorithm', is_required=True) alg = self._get_config_option('jwt_algorithm', is_required=True)
client_secret = self._get_config_option( client_secret = self._get_config_option(
'client_secret', is_required=True) 'client_secret', is_required=True)
client_assertion = jwt.encode( client_assertion = jwt.encode(
payload={ payload={
'jti': str(uuid.uuid4()), 'jti': str(uuid.uuid4()),
'iat': str(ita), 'iat': str(ita),
'exp': str(exp), 'exp': str(exp),
'iss': self.client_id, 'iss': self.client_id,
'sub': self.client_id, 'sub': self.client_id,
'aud': self._get_config_option('audience', is_required=True)}, 'aud': self._get_config_option('audience', is_required=True)},
headers={ headers={
'typ': 'JWT', 'typ': 'JWT',
'alg': alg}, 'alg': alg},
key=client_secret, key=client_secret,
algorithm=alg) algorithm=alg)
para = { para = {
'client_id': self.client_id, 'client_id': self.client_id,
'client_secret': client_secret, 'client_secret': client_secret,
'client_assertion_type': 'client_assertion_type':
'urn:ietf:params:oauth:client-assertion-type:jwt-bearer', 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
'client_assertion': client_assertion, 'client_assertion': client_assertion,
'scope': self.scope, 'scope': self.scope,
'grant_type': 'client_credentials' 'grant_type': 'client_credentials'
} }
http_response = session.request( http_response = session.request(
self.token_endpoint, self.token_endpoint,
'POST', 'POST',
authenticated=False, authenticated=False,
data=para) data=para)
return http_response return http_response
def get_headers(self, session, **kwargs): def get_headers(self, session, **kwargs):
"""Get an access token and add to request header for HTTP access.""" """Get an access token and add to request header for HTTP access."""
if not self.access_token: if not self.access_token:
try: try:
if self.auth_method == 'tls_client_auth': if self.auth_method == 'tls_client_auth':
http_response = self._get_token_by_tls_client_auth(session) http_response = self._get_token_by_tls_client_auth(session)
elif self.auth_method == 'client_secret_post': elif self.auth_method == 'client_secret_post':
http_response = self._get_token_by_client_secret_post( http_response = self._get_token_by_client_secret_post(
session) session)
elif self.auth_method == 'client_secret_basic': elif self.auth_method == 'client_secret_basic':
http_response = self._get_token_by_client_secret_basic( http_response = self._get_token_by_client_secret_basic(
session) session)
elif self.auth_method == 'private_key_jwt': elif self.auth_method == 'private_key_jwt':
http_response = self._get_token_by_private_key_jwt( http_response = self._get_token_by_private_key_jwt(
session) session)
elif self.auth_method == 'client_secret_jwt': elif self.auth_method == 'client_secret_jwt':
http_response = self._get_token_by_client_secret_jwt( http_response = self._get_token_by_client_secret_jwt(
session) session)
else: else:
LOG.error('The value is incorrect for option ' LOG.error('The value is incorrect for option '
'auth_method in group [%s]' % 'auth_method in group [%s]' %
_EXT_AUTH_CONFIG_GROUP_NAME) _EXT_AUTH_CONFIG_GROUP_NAME)
raise TackerException( raise TackerException(
_('The configuration parameter for ' _('The configuration parameter for '
'key "auth_method" in group [%s] is incorrect.') % 'key "auth_method" in group [%s] is incorrect.') %
_EXT_AUTH_CONFIG_GROUP_NAME) _EXT_AUTH_CONFIG_GROUP_NAME)
LOG.debug(http_response.text) LOG.debug(http_response.text)
if http_response.status_code != 200: if http_response.status_code != 200:
LOG.error('The OAuth2.0 access token API returns an ' LOG.error('The OAuth2.0 access token API returns an '
'incorrect response. ' 'incorrect response. '
'response_status: %s, response_text: %s' % 'response_status: %s, response_text: %s' %
(http_response.status_code, (http_response.status_code,
http_response.text)) http_response.text))
raise TackerException(_('Failed to get an access token.')) raise TackerException(_('Failed to get an access token.'))
access_token = http_response.json().get('access_token') access_token = http_response.json().get('access_token')
if not access_token: if not access_token:
LOG.error('Failed to get an access token: %s', LOG.error('Failed to get an access token: %s',
http_response.text) http_response.text)
raise TackerException(_('Failed to get an access token.')) raise TackerException(_('Failed to get an access token.'))
self.access_token = access_token self.access_token = access_token
except (ksa_exceptions.ConnectFailure, except (ksa_exceptions.ConnectFailure,
ksa_exceptions.DiscoveryFailure, ksa_exceptions.DiscoveryFailure,
ksa_exceptions.RequestTimeout) as error: ksa_exceptions.RequestTimeout) as error:
LOG.error('Unable to get an access token: %s', error) LOG.error('Unable to get an access token: %s', error)
raise TackerException( raise TackerException(
_('The OAuth2.0 access token API service is ' _('The OAuth2.0 access token API service is '
'temporarily unavailable.')) 'temporarily unavailable.'))
except TackerException: except TackerException:
raise raise
except Exception as error: except Exception as error:
LOG.error('Unable to get an access token: %s', error) LOG.error('Unable to get an access token: %s', error)
raise TackerException( raise TackerException(
_('An exception occurred during the processing ' _('An exception occurred during the processing '
'of getting an access token')) 'of getting an access token'))
header = {'Authorization': f'Bearer {self.access_token}'} header = {'Authorization': f'Bearer {self.access_token}'}
return header return header

View File

@ -1,62 +1,62 @@
# Copyright (C) 2022 Fujitsu # Copyright (C) 2022 Fujitsu
# All Rights Reserved. # All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
# a copy of the License at # a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from tacker.sol_refactored.objects import base from tacker.sol_refactored.objects import base
from tacker.sol_refactored.objects import fields from tacker.sol_refactored.objects import fields
# NFV-SOL 003 # NFV-SOL 003
# - v3.3.1 6.5.2.5 (API version: 2.1.0) # - v3.3.1 6.5.2.5 (API version: 2.1.0)
@base.TackerObjectRegistry.register @base.TackerObjectRegistry.register
class PerformanceInformationAvailableNotificationV2( class PerformanceInformationAvailableNotificationV2(
base.TackerObject, base.TackerObject,
base.TackerObjectDictCompat base.TackerObjectDictCompat
): ):
# Version 1.0: Initial version # Version 1.0: Initial version
VERSION = '1.0' VERSION = '1.0'
fields = { fields = {
'id': fields.StringField(nullable=False), 'id': fields.StringField(nullable=False),
'notificationType': fields.StringField(nullable=False), 'notificationType': fields.StringField(nullable=False),
'timeStamp': fields.DateTimeField(nullable=False), 'timeStamp': fields.DateTimeField(nullable=False),
'pmJobId': fields.StringField(nullable=False), 'pmJobId': fields.StringField(nullable=False),
'objectType': fields.StringField(nullable=False), 'objectType': fields.StringField(nullable=False),
'objectInstanceId': fields.StringField(nullable=False), 'objectInstanceId': fields.StringField(nullable=False),
'subObjectInstanceIds': fields.ListOfStringsField(nullable=True), 'subObjectInstanceIds': fields.ListOfStringsField(nullable=True),
'_links': fields.ObjectField( '_links': fields.ObjectField(
'PerformanceInformationAvailableNotificationV2_Links', 'PerformanceInformationAvailableNotificationV2_Links',
nullable=False), nullable=False),
} }
@base.TackerObjectRegistry.register @base.TackerObjectRegistry.register
class PerformanceInformationAvailableNotificationV2_Links( class PerformanceInformationAvailableNotificationV2_Links(
base.TackerObject, base.TackerObject,
base.TackerObjectDictCompat base.TackerObjectDictCompat
): ):
# Version 1.0: Initial version # Version 1.0: Initial version
VERSION = '1.0' VERSION = '1.0'
fields = { fields = {
'objectInstance': fields.ObjectField( 'objectInstance': fields.ObjectField(
'NotificationLink', nullable=True), 'NotificationLink', nullable=True),
'pmJob': fields.ObjectField( 'pmJob': fields.ObjectField(
'NotificationLink', nullable=False), 'NotificationLink', nullable=False),
'performanceReport': fields.ObjectField( 'performanceReport': fields.ObjectField(
'NotificationLink', nullable=False), 'NotificationLink', nullable=False),
} }

View File

@ -1,34 +1,34 @@
# Copyright (C) 2022 Fujitsu # Copyright (C) 2022 Fujitsu
# All Rights Reserved. # All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
# a copy of the License at # a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from tacker.sol_refactored.objects import base from tacker.sol_refactored.objects import base
from tacker.sol_refactored.objects import fields from tacker.sol_refactored.objects import fields
# NFV-SOL 003 # NFV-SOL 003
# - v3.3.1 6.5.3.3 (API version: 2.0.0) # - v3.3.1 6.5.3.3 (API version: 2.0.0)
@base.TackerObjectRegistry.register @base.TackerObjectRegistry.register
class VnfPmJobCriteriaV2(base.TackerObject, base.TackerObjectDictCompat): class VnfPmJobCriteriaV2(base.TackerObject, base.TackerObjectDictCompat):
# Version 1.0: Initial version # Version 1.0: Initial version
VERSION = '1.0' VERSION = '1.0'
fields = { fields = {
'performanceMetric': fields.ListOfStringsField(nullable=True), 'performanceMetric': fields.ListOfStringsField(nullable=True),
'performanceMetricGroup': fields.ListOfStringsField(nullable=True), 'performanceMetricGroup': fields.ListOfStringsField(nullable=True),
'collectionPeriod': fields.IntegerField(nullable=False), 'collectionPeriod': fields.IntegerField(nullable=False),
'reportingPeriod': fields.IntegerField(nullable=False), 'reportingPeriod': fields.IntegerField(nullable=False),
'reportingBoundary': fields.DateTimeField(nullable=True), 'reportingBoundary': fields.DateTimeField(nullable=True),
} }

View File

@ -1,32 +1,32 @@
# Copyright (C) 2022 Fujitsu # Copyright (C) 2022 Fujitsu
# All Rights Reserved. # All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
# a copy of the License at # a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from tacker.sol_refactored.objects import base from tacker.sol_refactored.objects import base
from tacker.sol_refactored.objects import fields from tacker.sol_refactored.objects import fields
# NFV-SOL 003 # NFV-SOL 003
# - v3.3.1 6.5.2.12 (API version: 2.1.0) # - v3.3.1 6.5.2.12 (API version: 2.1.0)
@base.TackerObjectRegistry.register @base.TackerObjectRegistry.register
class PmJobModificationsV2(base.TackerObject, base.TackerObjectDictCompat): class PmJobModificationsV2(base.TackerObject, base.TackerObjectDictCompat):
# Version 1.0: Initial version # Version 1.0: Initial version
VERSION = '1.0' VERSION = '1.0'
fields = { fields = {
'callbackUri': fields.StringField(nullable=True), 'callbackUri': fields.StringField(nullable=True),
'authentication': fields.ObjectField( 'authentication': fields.ObjectField(
'SubscriptionAuthentication', nullable=True), 'SubscriptionAuthentication', nullable=True),
} }

View File

@ -1,65 +1,65 @@
# Copyright (C) 2022 Fujitsu # Copyright (C) 2022 Fujitsu
# All Rights Reserved. # All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
# a copy of the License at # a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from tacker.sol_refactored.objects import base from tacker.sol_refactored.objects import base
from tacker.sol_refactored.objects import fields from tacker.sol_refactored.objects import fields
# NFV-SOL 003 # NFV-SOL 003
# - v3.3.1 6.5.2.10 (API version: 2.1.0) # - v3.3.1 6.5.2.10 (API version: 2.1.0)
@base.TackerObjectRegistry.register @base.TackerObjectRegistry.register
class PerformanceReportV2(base.TackerPersistentObject, class PerformanceReportV2(base.TackerPersistentObject,
base.TackerObjectDictCompat): base.TackerObjectDictCompat):
# Version 1.0: Initial version # Version 1.0: Initial version
VERSION = '1.0' VERSION = '1.0'
# PerformanceReportV2 need 'id' and 'jobId' # PerformanceReportV2 need 'id' and 'jobId'
fields = { fields = {
'id': fields.StringField(nullable=False), 'id': fields.StringField(nullable=False),
'jobId': fields.StringField(nullable=False), 'jobId': fields.StringField(nullable=False),
'entries': fields.ListOfObjectsField( 'entries': fields.ListOfObjectsField(
'VnfPmReportV2_Entries', nullable=False), 'VnfPmReportV2_Entries', nullable=False),
} }
@base.TackerObjectRegistry.register @base.TackerObjectRegistry.register
class VnfPmReportV2_Entries(base.TackerObject, base.TackerObjectDictCompat): class VnfPmReportV2_Entries(base.TackerObject, base.TackerObjectDictCompat):
# Version 1.0: Initial version # Version 1.0: Initial version
VERSION = '1.0' VERSION = '1.0'
fields = { fields = {
'objectType': fields.StringField(nullable=False), 'objectType': fields.StringField(nullable=False),
'objectInstanceId': fields.StringField(nullable=False), 'objectInstanceId': fields.StringField(nullable=False),
'subObjectInstanceId': fields.StringField(nullable=True), 'subObjectInstanceId': fields.StringField(nullable=True),
'performanceMetric': fields.StringField(nullable=False), 'performanceMetric': fields.StringField(nullable=False),
'performanceValues': fields.ListOfObjectsField( 'performanceValues': fields.ListOfObjectsField(
'VnfPmReportV2_Entries_PerformanceValues', nullable=False), 'VnfPmReportV2_Entries_PerformanceValues', nullable=False),
} }
@base.TackerObjectRegistry.register @base.TackerObjectRegistry.register
class VnfPmReportV2_Entries_PerformanceValues(base.TackerObject, class VnfPmReportV2_Entries_PerformanceValues(base.TackerObject,
base.TackerObjectDictCompat): base.TackerObjectDictCompat):
# Version 1.0: Initial version # Version 1.0: Initial version
VERSION = '1.0' VERSION = '1.0'
fields = { fields = {
'timeStamp': fields.DateTimeField(nullable=False), 'timeStamp': fields.DateTimeField(nullable=False),
'value': fields.StringField(nullable=False), 'value': fields.StringField(nullable=False),
'context': fields.KeyValuePairsField(nullable=True), 'context': fields.KeyValuePairsField(nullable=True),
} }

View File

@ -1,457 +1,457 @@
# Copyright (C) 2023 Fujitsu # Copyright (C) 2023 Fujitsu
# All Rights Reserved. # All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
# a copy of the License at # a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import base64 import base64
import copy import copy
import os import os
from unittest import mock from unittest import mock
import uuid import uuid
from oslo_config import cfg from oslo_config import cfg
from requests_mock.contrib import fixture as rm_fixture from requests_mock.contrib import fixture as rm_fixture
from keystoneauth1 import exceptions as ksa_exceptions from keystoneauth1 import exceptions as ksa_exceptions
from tacker.common.exceptions import TackerException from tacker.common.exceptions import TackerException
from tacker import context from tacker import context
from tacker.tests.unit import base from tacker.tests.unit import base
JWT_KEY_FILE = 'jwt_private.key' JWT_KEY_FILE = 'jwt_private.key'
def _get_sample_key(name): def _get_sample_key(name):
filename = os.path.join( filename = os.path.join(
os.path.dirname(os.path.abspath(__file__)), os.path.dirname(os.path.abspath(__file__)),
"./sample_keys/", name) "./sample_keys/", name)
with open(filename, "r") as f: with open(filename, "r") as f:
content = f.read() content = f.read()
return content return content
def get_mock_conf_effect(audience=None, token_endpoint=None, def get_mock_conf_effect(audience=None, token_endpoint=None,
auth_method=None, client_id=None, client_secret=None, auth_method=None, client_id=None, client_secret=None,
scope=None, jwt_key_file=None, jwt_algorithm=None, scope=None, jwt_key_file=None, jwt_algorithm=None,
jwt_bearer_time_out=None, certfile=None, keyfile=None, jwt_bearer_time_out=None, certfile=None, keyfile=None,
cafile=None, http_connect_timeout=None, insecure=None): cafile=None, http_connect_timeout=None, insecure=None):
def mock_conf_key_effect(name): def mock_conf_key_effect(name):
if name == 'keystone_authtoken': if name == 'keystone_authtoken':
return MockConfig(conf=None) return MockConfig(conf=None)
elif name == 'ext_oauth2_auth': elif name == 'ext_oauth2_auth':
config = {'use_ext_oauth2_auth': True} config = {'use_ext_oauth2_auth': True}
if audience: if audience:
config['audience'] = audience config['audience'] = audience
if token_endpoint: if token_endpoint:
config['token_endpoint'] = token_endpoint config['token_endpoint'] = token_endpoint
if auth_method: if auth_method:
config['auth_method'] = auth_method config['auth_method'] = auth_method
if client_id: if client_id:
config['client_id'] = client_id config['client_id'] = client_id
if client_secret: if client_secret:
config['client_secret'] = client_secret config['client_secret'] = client_secret
if scope: if scope:
config['scope'] = scope config['scope'] = scope
if jwt_key_file: if jwt_key_file:
config['jwt_key_file'] = jwt_key_file config['jwt_key_file'] = jwt_key_file
if jwt_algorithm: if jwt_algorithm:
config['jwt_algorithm'] = jwt_algorithm config['jwt_algorithm'] = jwt_algorithm
if jwt_bearer_time_out: if jwt_bearer_time_out:
config['jwt_bearer_time_out'] = jwt_bearer_time_out config['jwt_bearer_time_out'] = jwt_bearer_time_out
if certfile: if certfile:
config['certfile'] = certfile config['certfile'] = certfile
if keyfile: if keyfile:
config['keyfile'] = keyfile config['keyfile'] = keyfile
if cafile: if cafile:
config['cafile'] = cafile config['cafile'] = cafile
if cafile: if cafile:
config['http_connect_timeout'] = http_connect_timeout config['http_connect_timeout'] = http_connect_timeout
if cafile: if cafile:
config['insecure'] = insecure config['insecure'] = insecure
return MockConfig( return MockConfig(
conf=config) conf=config)
else: else:
return cfg.CONF._get(name) return cfg.CONF._get(name)
return mock_conf_key_effect return mock_conf_key_effect
class MockConfig(object): class MockConfig(object):
def __init__(self, conf=None): def __init__(self, conf=None):
self.conf = conf self.conf = conf
def __getattr__(self, name): def __getattr__(self, name):
if not self.conf or name not in self.conf: if not self.conf or name not in self.conf:
raise cfg.NoSuchOptError(f'not found {name}') raise cfg.NoSuchOptError(f'not found {name}')
return self.conf.get(name) return self.conf.get(name)
def __contains__(self, key): def __contains__(self, key):
return key in self.conf return key in self.conf
class MockSession(object): class MockSession(object):
def __init__(self, ): def __init__(self, ):
self.auth = None self.auth = None
class TestExtOAuth2Auth(base.TestCase): class TestExtOAuth2Auth(base.TestCase):
def setUp(self): def setUp(self):
super(TestExtOAuth2Auth, self).setUp() super(TestExtOAuth2Auth, self).setUp()
self.requests_mock = self.useFixture(rm_fixture.Fixture()) self.requests_mock = self.useFixture(rm_fixture.Fixture())
self.token_endpoint = 'http://demo/token_endpoint' self.token_endpoint = 'http://demo/token_endpoint'
self.auth_method = 'client_secret_post' self.auth_method = 'client_secret_post'
self.client_id = 'test_client_id' self.client_id = 'test_client_id'
self.client_secret = 'test_client_secret' self.client_secret = 'test_client_secret'
self.scope = 'tacker_api' self.scope = 'tacker_api'
self.access_token = f'access_token_{str(uuid.uuid4())}' self.access_token = f'access_token_{str(uuid.uuid4())}'
self.audience = 'http://demo/audience' self.audience = 'http://demo/audience'
self.jwt_bearer_time_out = 2800 self.jwt_bearer_time_out = 2800
self.addCleanup(mock.patch.stopall) self.addCleanup(mock.patch.stopall)
def _get_access_token_response(self, request, context, def _get_access_token_response(self, request, context,
auth_method=None, auth_method=None,
client_id=None, client_id=None,
client_secret=None, client_secret=None,
scope=None, scope=None,
access_token=None, access_token=None,
status_code=200, status_code=200,
raise_error=None, raise_error=None,
resp=None resp=None
): ):
if raise_error: if raise_error:
raise raise_error raise raise_error
if auth_method == 'tls_client_auth': if auth_method == 'tls_client_auth':
body = (f'client_id={client_id}&scope={scope}' body = (f'client_id={client_id}&scope={scope}'
f'&grant_type=client_credentials') f'&grant_type=client_credentials')
self.assertEqual(request.text, body) self.assertEqual(request.text, body)
elif auth_method == 'client_secret_post': elif auth_method == 'client_secret_post':
body = (f'client_id={client_id}&client_secret={client_secret}' body = (f'client_id={client_id}&client_secret={client_secret}'
f'&scope={scope}&grant_type=client_credentials') f'&scope={scope}&grant_type=client_credentials')
self.assertEqual(request.text, body) self.assertEqual(request.text, body)
elif auth_method == 'client_secret_basic': elif auth_method == 'client_secret_basic':
body = f'scope={scope}&grant_type=client_credentials' body = f'scope={scope}&grant_type=client_credentials'
self.assertEqual(request.text, body) self.assertEqual(request.text, body)
auth_basic = request._request.headers.get('Authorization') auth_basic = request._request.headers.get('Authorization')
self.assertIsNotNone(auth_basic) self.assertIsNotNone(auth_basic)
auth = 'Basic ' + base64.standard_b64encode( auth = 'Basic ' + base64.standard_b64encode(
f'{client_id}:{client_secret}'.encode('ascii')).decode('ascii') f'{client_id}:{client_secret}'.encode('ascii')).decode('ascii')
self.assertEqual(auth_basic, auth) self.assertEqual(auth_basic, auth)
elif auth_method == 'private_key_jwt': elif auth_method == 'private_key_jwt':
self.assertIn(f'client_id={client_id}', request.text) self.assertIn(f'client_id={client_id}', request.text)
self.assertIn(('client_assertion_type=urn%3Aietf%3Aparams%3A' self.assertIn(('client_assertion_type=urn%3Aietf%3Aparams%3A'
'oauth%3Aclient-assertion-type%3Ajwt-bearer'), 'oauth%3Aclient-assertion-type%3Ajwt-bearer'),
request.text) request.text)
self.assertIn('client_assertion=', request.text) self.assertIn('client_assertion=', request.text)
self.assertIn(f'scope={scope}', request.text) self.assertIn(f'scope={scope}', request.text)
self.assertIn('grant_type=client_credentials', request.text) self.assertIn('grant_type=client_credentials', request.text)
elif auth_method == 'client_secret_jwt': elif auth_method == 'client_secret_jwt':
self.assertIn(f'client_id={client_id}', request.text) self.assertIn(f'client_id={client_id}', request.text)
self.assertIn(('client_assertion_type=urn%3Aietf%3Aparams%3A' self.assertIn(('client_assertion_type=urn%3Aietf%3Aparams%3A'
'oauth%3Aclient-assertion-type%3Ajwt-bearer'), 'oauth%3Aclient-assertion-type%3Ajwt-bearer'),
request.text) request.text)
self.assertIn('client_assertion=', request.text) self.assertIn('client_assertion=', request.text)
self.assertIn(f'scope={scope}', request.text) self.assertIn(f'scope={scope}', request.text)
self.assertIn('grant_type=client_credentials', request.text) self.assertIn('grant_type=client_credentials', request.text)
if not access_token: if not access_token:
access_token = f'access_token{str(uuid.uuid4())}' access_token = f'access_token{str(uuid.uuid4())}'
if not resp: if not resp:
if status_code == 200: if status_code == 200:
response = { response = {
'access_token': access_token, 'access_token': access_token,
'expires_in': 1800, 'expires_in': 1800,
'refresh_expires_in': 0, 'refresh_expires_in': 0,
'token_type': 'Bearer', 'token_type': 'Bearer',
'not-before-policy': 0, 'not-before-policy': 0,
'scope': scope 'scope': scope
} }
else: else:
response = {'error': 'error_title', response = {'error': 'error_title',
'error_description': 'error message'} 'error_description': 'error message'}
else: else:
response = copy.deepcopy(resp) response = copy.deepcopy(resp)
context.status_code = status_code context.status_code = status_code
return response return response
def _get_default_mock_conf_effect(self): def _get_default_mock_conf_effect(self):
return get_mock_conf_effect( return get_mock_conf_effect(
token_endpoint=self.token_endpoint, token_endpoint=self.token_endpoint,
auth_method=self.auth_method, auth_method=self.auth_method,
client_id=self.client_id, client_id=self.client_id,
client_secret=self.client_secret, client_secret=self.client_secret,
scope=self.scope) scope=self.scope)
def _check_authorization_header(self): def _check_authorization_header(self):
auth_context = context.generate_tacker_service_context() auth_context = context.generate_tacker_service_context()
session = auth_context.create_session() session = auth_context.create_session()
headers = auth_context.get_headers(session) headers = auth_context.get_headers(session)
bearer = f'Bearer {self.access_token}' bearer = f'Bearer {self.access_token}'
self.assertIn('Authorization', headers) self.assertIn('Authorization', headers)
self.assertEqual(bearer, headers.get('Authorization')) self.assertEqual(bearer, headers.get('Authorization'))
return auth_context return auth_context
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__') @mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_init_without_token_endpoint(self, mock_get_conf_key): def test_init_without_token_endpoint(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_effect( mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint='', token_endpoint='',
auth_method=self.auth_method, auth_method=self.auth_method,
client_id=self.client_id, client_id=self.client_id,
client_secret=self.client_secret, client_secret=self.client_secret,
scope=self.scope scope=self.scope
) )
self.assertRaises(TackerException, self.assertRaises(TackerException,
context.generate_tacker_service_context) context.generate_tacker_service_context)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__') @mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_init_without_scope(self, mock_get_conf_key): def test_init_without_scope(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_effect( mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint, token_endpoint=self.token_endpoint,
auth_method=self.auth_method, auth_method=self.auth_method,
client_id=self.client_id, client_id=self.client_id,
client_secret=self.client_secret) client_secret=self.client_secret)
self.assertRaises(TackerException, self.assertRaises(TackerException,
context.generate_tacker_service_context) context.generate_tacker_service_context)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__') @mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_init_without_keystone_middleware_opts(self, mock_get_conf_key): def test_init_without_keystone_middleware_opts(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_effect( mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint, token_endpoint=self.token_endpoint,
client_id=self.client_id, client_id=self.client_id,
client_secret=self.client_secret, client_secret=self.client_secret,
scope=self.scope) scope=self.scope)
self.assertRaises(TackerException, self.assertRaises(TackerException,
context.generate_tacker_service_context) context.generate_tacker_service_context)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__') @mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('keystoneauth1.loading.session.Session.load_from_options') @mock.patch('keystoneauth1.loading.session.Session.load_from_options')
def test_create_session(self, mock_load_from_options, mock_get_conf_key): def test_create_session(self, mock_load_from_options, mock_get_conf_key):
certfile = f'/demo/certfile{str(uuid.uuid4())}' certfile = f'/demo/certfile{str(uuid.uuid4())}'
keyfile = f'/demo/keyfile{str(uuid.uuid4())}' keyfile = f'/demo/keyfile{str(uuid.uuid4())}'
cafile = f'/demo/cafile{str(uuid.uuid4())}' cafile = f'/demo/cafile{str(uuid.uuid4())}'
conf_insecure = True conf_insecure = True
http_connect_timeout = 1000 http_connect_timeout = 1000
def load_side_effect(**kwargs): def load_side_effect(**kwargs):
self.assertEqual(conf_insecure, kwargs.get('insecure')) self.assertEqual(conf_insecure, kwargs.get('insecure'))
self.assertEqual(cafile, kwargs.get('cacert')) self.assertEqual(cafile, kwargs.get('cacert'))
self.assertEqual(certfile, kwargs.get('cert')) self.assertEqual(certfile, kwargs.get('cert'))
self.assertEqual(keyfile, kwargs.get('key')) self.assertEqual(keyfile, kwargs.get('key'))
self.assertEqual(http_connect_timeout, kwargs.get('timeout')) self.assertEqual(http_connect_timeout, kwargs.get('timeout'))
return MockSession() return MockSession()
mock_load_from_options.side_effect = load_side_effect mock_load_from_options.side_effect = load_side_effect
mock_get_conf_key.side_effect = get_mock_conf_effect( mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint, token_endpoint=self.token_endpoint,
auth_method='tls_client_auth', auth_method='tls_client_auth',
client_id=self.client_id, client_id=self.client_id,
scope=self.scope, scope=self.scope,
certfile=certfile, certfile=certfile,
keyfile=keyfile, keyfile=keyfile,
cafile=cafile, cafile=cafile,
insecure=conf_insecure, insecure=conf_insecure,
http_connect_timeout=http_connect_timeout) http_connect_timeout=http_connect_timeout)
auth_context = context.generate_tacker_service_context() auth_context = context.generate_tacker_service_context()
auth_context.create_session() auth_context.create_session()
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__') @mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_connection_params(self, mock_get_conf_key): def test_get_connection_params(self, mock_get_conf_key):
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect() mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context() auth_context = context.generate_tacker_service_context()
session = auth_context.create_session() session = auth_context.create_session()
params = auth_context.get_connection_params(session) params = auth_context.get_connection_params(session)
self.assertDictEqual(params, {}) self.assertDictEqual(params, {})
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__') @mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_tls_client_auth(self, mock_get_conf_key): def test_get_headers_tls_client_auth(self, mock_get_conf_key):
def mock_resp(request, context): def mock_resp(request, context):
return self._get_access_token_response( return self._get_access_token_response(
request, context, request, context,
auth_method='tls_client_auth', auth_method='tls_client_auth',
client_id=self.client_id, client_id=self.client_id,
scope=self.scope, scope=self.scope,
access_token=self.access_token) access_token=self.access_token)
self.requests_mock.post(self.token_endpoint, json=mock_resp) self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect( mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint, token_endpoint=self.token_endpoint,
auth_method='tls_client_auth', auth_method='tls_client_auth',
client_id=self.client_id, client_id=self.client_id,
scope=self.scope) scope=self.scope)
auth_context = self._check_authorization_header() auth_context = self._check_authorization_header()
result = auth_context.invalidate() result = auth_context.invalidate()
self.assertEqual(True, result) self.assertEqual(True, result)
self.assertIsNone(auth_context.access_token) self.assertIsNone(auth_context.access_token)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__') @mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_client_secret_post(self, mock_get_conf_key): def test_get_headers_client_secret_post(self, mock_get_conf_key):
def mock_resp(request, context): def mock_resp(request, context):
return self._get_access_token_response( return self._get_access_token_response(
request, context, request, context,
auth_method='client_secret_post', auth_method='client_secret_post',
client_id=self.client_id, client_id=self.client_id,
client_secret=self.client_secret, client_secret=self.client_secret,
scope=self.scope, scope=self.scope,
access_token=self.access_token access_token=self.access_token
) )
self.requests_mock.post(self.token_endpoint, json=mock_resp) self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect( mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint, token_endpoint=self.token_endpoint,
auth_method='client_secret_post', auth_method='client_secret_post',
client_id=self.client_id, client_id=self.client_id,
client_secret=self.client_secret, client_secret=self.client_secret,
scope=self.scope) scope=self.scope)
self._check_authorization_header() self._check_authorization_header()
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__') @mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_client_secret_basic(self, mock_get_conf_key): def test_get_headers_client_secret_basic(self, mock_get_conf_key):
def mock_resp(request, context): def mock_resp(request, context):
return self._get_access_token_response( return self._get_access_token_response(
request, context, request, context,
auth_method='client_secret_basic', auth_method='client_secret_basic',
client_id=self.client_id, client_id=self.client_id,
client_secret=self.client_secret, client_secret=self.client_secret,
scope=self.scope, scope=self.scope,
access_token=self.access_token) access_token=self.access_token)
self.requests_mock.post(self.token_endpoint, json=mock_resp) self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect( mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint, token_endpoint=self.token_endpoint,
auth_method='client_secret_basic', auth_method='client_secret_basic',
client_id=self.client_id, client_id=self.client_id,
client_secret=self.client_secret, client_secret=self.client_secret,
scope=self.scope) scope=self.scope)
self._check_authorization_header() self._check_authorization_header()
@mock.patch('builtins.open', mock.mock_open(read_data=_get_sample_key( @mock.patch('builtins.open', mock.mock_open(read_data=_get_sample_key(
JWT_KEY_FILE))) JWT_KEY_FILE)))
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__') @mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_private_key_jwt(self, mock_get_conf_key): def test_get_headers_private_key_jwt(self, mock_get_conf_key):
def mock_resp(request, context): def mock_resp(request, context):
return self._get_access_token_response( return self._get_access_token_response(
request, context, request, context,
auth_method='private_key_jwt', auth_method='private_key_jwt',
client_id=self.client_id, client_id=self.client_id,
scope=self.scope, scope=self.scope,
access_token=self.access_token) access_token=self.access_token)
self.requests_mock.post(self.token_endpoint, json=mock_resp) self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect( mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint, token_endpoint=self.token_endpoint,
auth_method='private_key_jwt', auth_method='private_key_jwt',
client_id=self.client_id, client_id=self.client_id,
audience=self.audience, audience=self.audience,
jwt_key_file=f'/demo/jwt_key_file{str(uuid.uuid4())}', jwt_key_file=f'/demo/jwt_key_file{str(uuid.uuid4())}',
jwt_algorithm='RS256', jwt_algorithm='RS256',
jwt_bearer_time_out=self.jwt_bearer_time_out, jwt_bearer_time_out=self.jwt_bearer_time_out,
scope=self.scope) scope=self.scope)
self._check_authorization_header() self._check_authorization_header()
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__') @mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_client_secret_jwt(self, mock_get_conf_key): def test_get_headers_client_secret_jwt(self, mock_get_conf_key):
def mock_resp(request, context): def mock_resp(request, context):
return self._get_access_token_response( return self._get_access_token_response(
request, context, request, context,
auth_method='client_secret_jwt', auth_method='client_secret_jwt',
client_id=self.client_id, client_id=self.client_id,
client_secret=self.client_secret, client_secret=self.client_secret,
scope=self.scope, scope=self.scope,
access_token=self.access_token) access_token=self.access_token)
self.requests_mock.post(self.token_endpoint, json=mock_resp) self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = get_mock_conf_effect( mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint, token_endpoint=self.token_endpoint,
auth_method='client_secret_jwt', auth_method='client_secret_jwt',
client_id=self.client_id, client_id=self.client_id,
audience=self.audience, audience=self.audience,
client_secret=self.client_secret, client_secret=self.client_secret,
jwt_algorithm='HS256', jwt_algorithm='HS256',
jwt_bearer_time_out=self.jwt_bearer_time_out, jwt_bearer_time_out=self.jwt_bearer_time_out,
scope=self.scope) scope=self.scope)
self._check_authorization_header() self._check_authorization_header()
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__') @mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_invalid_auth_method(self, mock_get_conf_key): def test_get_headers_invalid_auth_method(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_effect( mock_get_conf_key.side_effect = get_mock_conf_effect(
token_endpoint=self.token_endpoint, token_endpoint=self.token_endpoint,
auth_method='client_secret_other', auth_method='client_secret_other',
client_id=self.client_id, client_id=self.client_id,
client_secret=self.client_secret, client_secret=self.client_secret,
scope=self.scope scope=self.scope
) )
auth_context = context.generate_tacker_service_context() auth_context = context.generate_tacker_service_context()
session = auth_context.create_session() session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session) self.assertRaises(TackerException, auth_context.get_headers, session)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__') @mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_connect_fail(self, mock_get_conf_key): def test_get_headers_connect_fail(self, mock_get_conf_key):
def mock_resp(request, context): def mock_resp(request, context):
return self._get_access_token_response( return self._get_access_token_response(
request, context, request, context,
auth_method=self.auth_method, auth_method=self.auth_method,
client_id=self.client_id, client_id=self.client_id,
client_secret=self.client_secret, client_secret=self.client_secret,
scope=self.scope, scope=self.scope,
access_token=self.access_token, access_token=self.access_token,
raise_error=ksa_exceptions.RequestTimeout('connect time out.')) raise_error=ksa_exceptions.RequestTimeout('connect time out.'))
self.requests_mock.post(self.token_endpoint, json=mock_resp) self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect() mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context() auth_context = context.generate_tacker_service_context()
session = auth_context.create_session() session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session) self.assertRaises(TackerException, auth_context.get_headers, session)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__') @mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_is_not_200(self, mock_get_conf_key): def test_get_headers_is_not_200(self, mock_get_conf_key):
def mock_resp(request, context): def mock_resp(request, context):
return self._get_access_token_response( return self._get_access_token_response(
request, context, request, context,
auth_method=self.auth_method, auth_method=self.auth_method,
client_id=self.client_id, client_id=self.client_id,
client_secret=self.client_secret, client_secret=self.client_secret,
scope=self.scope, scope=self.scope,
access_token=self.access_token, access_token=self.access_token,
status_code=201) status_code=201)
self.requests_mock.post(self.token_endpoint, json=mock_resp) self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect() mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context() auth_context = context.generate_tacker_service_context()
session = auth_context.create_session() session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session) self.assertRaises(TackerException, auth_context.get_headers, session)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__') @mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_not_include_access_token(self, mock_get_conf_key): def test_get_headers_not_include_access_token(self, mock_get_conf_key):
def mock_resp(request, context): def mock_resp(request, context):
return self._get_access_token_response( return self._get_access_token_response(
request, context, request, context,
auth_method=self.auth_method, auth_method=self.auth_method,
client_id=self.client_id, client_id=self.client_id,
client_secret=self.client_secret, client_secret=self.client_secret,
scope=self.scope, scope=self.scope,
access_token=self.access_token, access_token=self.access_token,
status_code=200, status_code=200,
resp={'error': 'invalid_client', resp={'error': 'invalid_client',
'error_description': 'The client is not found.'}) 'error_description': 'The client is not found.'})
self.requests_mock.post(self.token_endpoint, json=mock_resp) self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect() mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context() auth_context = context.generate_tacker_service_context()
session = auth_context.create_session() session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session) self.assertRaises(TackerException, auth_context.get_headers, session)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__') @mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_headers_unknown_error(self, mock_get_conf_key): def test_get_headers_unknown_error(self, mock_get_conf_key):
def mock_resp(request, context): def mock_resp(request, context):
return self._get_access_token_response( return self._get_access_token_response(
request, context, request, context,
auth_method=self.auth_method, auth_method=self.auth_method,
client_id=self.client_id, client_id=self.client_id,
client_secret=self.client_secret, client_secret=self.client_secret,
scope=self.scope, scope=self.scope,
access_token=self.access_token, access_token=self.access_token,
raise_error=Exception('unknown error occurred.')) raise_error=Exception('unknown error occurred.'))
self.requests_mock.post(self.token_endpoint, json=mock_resp) self.requests_mock.post(self.token_endpoint, json=mock_resp)
mock_get_conf_key.side_effect = self._get_default_mock_conf_effect() mock_get_conf_key.side_effect = self._get_default_mock_conf_effect()
auth_context = context.generate_tacker_service_context() auth_context = context.generate_tacker_service_context()
session = auth_context.create_session() session = auth_context.create_session()
self.assertRaises(TackerException, auth_context.get_headers, session) self.assertRaises(TackerException, auth_context.get_headers, session)

View File

@ -1,255 +1,255 @@
# Copyright (C) 2023 Fujitsu # Copyright (C) 2023 Fujitsu
# All Rights Reserved. # All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
# a copy of the License at # a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from unittest import mock from unittest import mock
import uuid import uuid
from oslo_config import cfg from oslo_config import cfg
from requests_mock.contrib import fixture as rm_fixture from requests_mock.contrib import fixture as rm_fixture
from tacker import context as t_context from tacker import context as t_context
from tacker.keymgr.barbican_key_manager import BarbicanKeyManager from tacker.keymgr.barbican_key_manager import BarbicanKeyManager
from tacker.keymgr import exception from tacker.keymgr import exception
from tacker.tests.unit import base from tacker.tests.unit import base
def get_mock_conf_key_effect(barbican_endpoint=None): def get_mock_conf_key_effect(barbican_endpoint=None):
def mock_conf_key_effect(name): def mock_conf_key_effect(name):
if name == 'ext_oauth2_auth': if name == 'ext_oauth2_auth':
return MockConfig( return MockConfig(
conf={ conf={
'use_ext_oauth2_auth': True, 'use_ext_oauth2_auth': True,
'token_endpoint': 'http://demo/token_endpoint', 'token_endpoint': 'http://demo/token_endpoint',
'auth_method': 'client_secret_post', 'auth_method': 'client_secret_post',
'client_id': 'client_id', 'client_id': 'client_id',
'client_secret': 'client_secret', 'client_secret': 'client_secret',
'scope': 'client_secret' 'scope': 'client_secret'
}) })
elif name == 'key_manager': elif name == 'key_manager':
conf = { conf = {
'api_class': 'tacker.keymgr.barbican_key_manager' 'api_class': 'tacker.keymgr.barbican_key_manager'
'.BarbicanKeyManager', '.BarbicanKeyManager',
'barbican_version': 'v1', 'barbican_version': 'v1',
'barbican_endpoint': barbican_endpoint 'barbican_endpoint': barbican_endpoint
} }
return MockConfig(conf=conf) return MockConfig(conf=conf)
elif name == 'k8s_vim': elif name == 'k8s_vim':
return MockConfig( return MockConfig(
conf={ conf={
'use_barbican': True 'use_barbican': True
}) })
else: else:
return cfg.CONF._get(name) return cfg.CONF._get(name)
return mock_conf_key_effect return mock_conf_key_effect
class MockConfig(object): class MockConfig(object):
def __init__(self, conf=None): def __init__(self, conf=None):
self.conf = conf self.conf = conf
def __getattr__(self, name): def __getattr__(self, name):
if not self.conf and name not in self.conf: if not self.conf and name not in self.conf:
raise cfg.NoSuchOptError(f'not found {name}') raise cfg.NoSuchOptError(f'not found {name}')
return self.conf.get(name) return self.conf.get(name)
def __contains__(self, key): def __contains__(self, key):
return key in self.conf return key in self.conf
class TestBarbicanKeyManager(base.TestCase): class TestBarbicanKeyManager(base.TestCase):
def setUp(self): def setUp(self):
super(TestBarbicanKeyManager, self).setUp() super(TestBarbicanKeyManager, self).setUp()
self.requests_mock = self.useFixture(rm_fixture.Fixture()) self.requests_mock = self.useFixture(rm_fixture.Fixture())
self.token_endpoint = 'http://demo/token_endpoint' self.token_endpoint = 'http://demo/token_endpoint'
self.auth_method = 'client_secret_post' self.auth_method = 'client_secret_post'
self.client_id = 'test_client_id' self.client_id = 'test_client_id'
self.client_secret = 'test_client_secret' self.client_secret = 'test_client_secret'
self.scope = 'tacker_api' self.scope = 'tacker_api'
self.access_token = f'access_token_{str(uuid.uuid4())}' self.access_token = f'access_token_{str(uuid.uuid4())}'
self.audience = 'http://demo/audience' self.audience = 'http://demo/audience'
self.jwt_bearer_time_out = 2800 self.jwt_bearer_time_out = 2800
self.addCleanup(mock.patch.stopall) self.addCleanup(mock.patch.stopall)
def _mock_external_token_api(self): def _mock_external_token_api(self):
def mock_token_resp(request, context): def mock_token_resp(request, context):
response = { response = {
'access_token': self.access_token, 'access_token': self.access_token,
'expires_in': 1800, 'expires_in': 1800,
'refresh_expires_in': 0, 'refresh_expires_in': 0,
'token_type': 'Bearer', 'token_type': 'Bearer',
'not-before-policy': 0, 'not-before-policy': 0,
'scope': 'tacker_api' 'scope': 'tacker_api'
} }
context.status_code = 200 context.status_code = 200
return response return response
self.requests_mock.post('http://demo/token_endpoint', self.requests_mock.post('http://demo/token_endpoint',
json=mock_token_resp) json=mock_token_resp)
def _mock_barbican_get_version_resp(self): def _mock_barbican_get_version_resp(self):
def mock_barbican_get_resp(request, context): def mock_barbican_get_resp(request, context):
auth_value = f'Bearer {self.access_token}' auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization') req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth) self.assertEqual(auth_value, req_auth)
context.status_code = 200 context.status_code = 200
response = { response = {
"versions": { "versions": {
"values": [ "values": [
{ {
"id": "v1", "id": "v1",
"status": "stable", "status": "stable",
"links": [ "links": [
{ {
"rel": "self", "rel": "self",
"href": "http://demo/barbican/v1/" "href": "http://demo/barbican/v1/"
}, },
{ {
"rel": "describedby", "rel": "describedby",
"type": "text/html", "type": "text/html",
"href": "https://docs.openstack.org/"} "href": "https://docs.openstack.org/"}
], ],
"media-types": [ "media-types": [
{ {
"base": "application/json", "base": "application/json",
"type": "application/" "type": "application/"
"vnd.openstack.key-manager-v1+json" "vnd.openstack.key-manager-v1+json"
} }
] ]
} }
] ]
} }
} }
return response return response
return mock_barbican_get_resp return mock_barbican_get_resp
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__') @mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('barbicanclient.base.validate_ref_and_return_uuid') @mock.patch('barbicanclient.base.validate_ref_and_return_uuid')
def test_delete_ext_oauth2_auth(self, mock_validate, mock_get_conf_key): def test_delete_ext_oauth2_auth(self, mock_validate, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect( mock_get_conf_key.side_effect = get_mock_conf_key_effect(
barbican_endpoint='http://demo/barbican/') barbican_endpoint='http://demo/barbican/')
self._mock_external_token_api() self._mock_external_token_api()
mock_validate.return_value = True mock_validate.return_value = True
def mock_barbican_delete_resp(request, context): def mock_barbican_delete_resp(request, context):
auth_value = f'Bearer {self.access_token}' auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization') req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth) self.assertEqual(auth_value, req_auth)
context.status_code = 204 context.status_code = 204
return '' return ''
def mock_barbican_get_for_check_resp(request, context): def mock_barbican_get_for_check_resp(request, context):
auth_value = f'Bearer {self.access_token}' auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization') req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth) self.assertEqual(auth_value, req_auth)
context.status_code = 200 context.status_code = 200
return {} return {}
self.requests_mock.get( self.requests_mock.get(
'http://demo/barbican', 'http://demo/barbican',
json=self._mock_barbican_get_version_resp()) json=self._mock_barbican_get_version_resp())
self.requests_mock.delete( self.requests_mock.delete(
'http://demo/barbican/v1/secrets/True', 'http://demo/barbican/v1/secrets/True',
json=mock_barbican_delete_resp) json=mock_barbican_delete_resp)
self.requests_mock.get( self.requests_mock.get(
'http://demo/barbican/v1/secrets/True', 'http://demo/barbican/v1/secrets/True',
json=mock_barbican_get_for_check_resp) json=mock_barbican_get_for_check_resp)
auth = t_context.generate_tacker_service_context() auth = t_context.generate_tacker_service_context()
keymgr = BarbicanKeyManager(auth.token_endpoint) keymgr = BarbicanKeyManager(auth.token_endpoint)
keymgr.delete(auth, 'test') keymgr.delete(auth, 'test')
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__') @mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('tacker.keymgr.barbican_key_manager.' @mock.patch('tacker.keymgr.barbican_key_manager.'
'BarbicanKeyManager._retrieve_secret_uuid') 'BarbicanKeyManager._retrieve_secret_uuid')
def test_store_ext_oauth2_auth(self, mock_secret_uuid, def test_store_ext_oauth2_auth(self, mock_secret_uuid,
mock_get_conf_key): mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect( mock_get_conf_key.side_effect = get_mock_conf_key_effect(
barbican_endpoint='http://demo/barbican') barbican_endpoint='http://demo/barbican')
secret_id = 'store_secret_uuid' secret_id = 'store_secret_uuid'
mock_secret_uuid.return_value = secret_id mock_secret_uuid.return_value = secret_id
self._mock_external_token_api() self._mock_external_token_api()
def mock_barbican_post_resp(request, context): def mock_barbican_post_resp(request, context):
auth_value = f'Bearer {self.access_token}' auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization') req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth) self.assertEqual(auth_value, req_auth)
response = { response = {
'name': 'AES key', 'name': 'AES key',
'expiration': '2023-01-13T19:14:44.180394', 'expiration': '2023-01-13T19:14:44.180394',
'algorithm': 'aes', 'algorithm': 'aes',
'bit_length': 256, 'bit_length': 256,
'mode': 'cbc', 'mode': 'cbc',
'payload': 'YmVlcg==', 'payload': 'YmVlcg==',
'payload_content_type': 'application/octet-stream', 'payload_content_type': 'application/octet-stream',
'payload_content_encoding': 'base64' 'payload_content_encoding': 'base64'
} }
context.status_code = 201 context.status_code = 201
return response return response
self.requests_mock.get( self.requests_mock.get(
'http://demo/barbican', 'http://demo/barbican',
json=self._mock_barbican_get_version_resp()) json=self._mock_barbican_get_version_resp())
self.requests_mock.post('http://demo/barbican/v1/secrets/', self.requests_mock.post('http://demo/barbican/v1/secrets/',
json=mock_barbican_post_resp) json=mock_barbican_post_resp)
auth = t_context.generate_tacker_service_context() auth = t_context.generate_tacker_service_context()
keymgr = BarbicanKeyManager(auth.token_endpoint) keymgr = BarbicanKeyManager(auth.token_endpoint)
result = keymgr.store(auth, 'test') result = keymgr.store(auth, 'test')
self.assertEqual(result, secret_id) self.assertEqual(result, secret_id)
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__') @mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
@mock.patch('barbicanclient.base.validate_ref_and_return_uuid') @mock.patch('barbicanclient.base.validate_ref_and_return_uuid')
def test_get_ext_oauth2_auth(self, mock_validate, mock_get_conf_key): def test_get_ext_oauth2_auth(self, mock_validate, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect( mock_get_conf_key.side_effect = get_mock_conf_key_effect(
barbican_endpoint='http://demo/barbican/') barbican_endpoint='http://demo/barbican/')
self._mock_external_token_api() self._mock_external_token_api()
mock_validate.return_value = True mock_validate.return_value = True
def mock_barbican_get_resp(request, context): def mock_barbican_get_resp(request, context):
auth_value = f'Bearer {self.access_token}' auth_value = f'Bearer {self.access_token}'
req_auth = request._request.headers.get('Authorization') req_auth = request._request.headers.get('Authorization')
self.assertEqual(auth_value, req_auth) self.assertEqual(auth_value, req_auth)
context.status_code = 200 context.status_code = 200
response = { response = {
'id': 'test001' 'id': 'test001'
} }
return response return response
self.requests_mock.get( self.requests_mock.get(
'http://demo/barbican', 'http://demo/barbican',
json=self._mock_barbican_get_version_resp()) json=self._mock_barbican_get_version_resp())
self.requests_mock.get( self.requests_mock.get(
'http://demo/barbican/v1/secrets/True', 'http://demo/barbican/v1/secrets/True',
json=mock_barbican_get_resp) json=mock_barbican_get_resp)
auth = t_context.generate_tacker_service_context() auth = t_context.generate_tacker_service_context()
keymgr = BarbicanKeyManager(auth.token_endpoint) keymgr = BarbicanKeyManager(auth.token_endpoint)
result = keymgr.get(auth, 'test001') result = keymgr.get(auth, 'test001')
self.assertEqual(result.secret_ref, self.assertEqual(result.secret_ref,
'http://demo/barbican/v1/secrets/test001') 'http://demo/barbican/v1/secrets/test001')
@mock.patch('oslo_config.cfg.ConfigOpts.__getattr__') @mock.patch('oslo_config.cfg.ConfigOpts.__getattr__')
def test_get_ext_oauth2_auth_no_endpoint(self, mock_get_conf_key): def test_get_ext_oauth2_auth_no_endpoint(self, mock_get_conf_key):
mock_get_conf_key.side_effect = get_mock_conf_key_effect( mock_get_conf_key.side_effect = get_mock_conf_key_effect(
barbican_endpoint='') barbican_endpoint='')
self._mock_external_token_api() self._mock_external_token_api()
auth = t_context.generate_tacker_service_context() auth = t_context.generate_tacker_service_context()
keymgr = BarbicanKeyManager(auth.token_endpoint) keymgr = BarbicanKeyManager(auth.token_endpoint)
self.assertRaises(exception.KeyManagerError, keymgr.get, auth, 'test') self.assertRaises(exception.KeyManagerError, keymgr.get, auth, 'test')