[WIP]Enhancement of Tacker API resource access control

For fine-grained access control based on user and VNF information
for API resources, this patch does the following things:
1.Add three comparison attributes of area, vendor, and namespace
for the enhanced tackler policy.
2.Convert special roles to API attributes in context.
3.Modify the API process to support Tacker policy authorize.
4.Add the Tacker policy filter to the list API processes.

Implements: https://blueprints.launchpad.net/tacker/+spec/enhance-api-policy
Change-Id: I5b4c39387860133a3bcf4544f18a6353c80773f6
This commit is contained in:
kexuesheng 2023-01-20 16:00:36 +09:00
parent b34b40944b
commit 878c4d852a
17 changed files with 3012 additions and 33 deletions

View File

@ -288,7 +288,7 @@ class VnfLcmController(wsgi.Controller):
vim_client_obj = vim_client.VimClient()
try:
vim_client_obj.get_vim(
return vim_client_obj.get_vim(
context, vim_id, region_name=region_name)
except nfvo.VimDefaultNotDefined as exp:
raise webob.exc.HTTPBadRequest(explanation=exp.message)
@ -439,7 +439,8 @@ class VnfLcmController(wsgi.Controller):
@validation.schema(vnf_lcm.create)
def create(self, request, body):
context = request.environ['tacker.context']
context.can(vnf_lcm_policies.VNFLCM % 'create')
if not CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_lcm_policies.VNFLCM % 'create')
try:
req_body = utils.convert_camelcase_to_snakecase(body)
vnfd_id = req_body.get('vnfd_id')
@ -466,6 +467,10 @@ class VnfLcmController(wsgi.Controller):
vnfd_id)
return self._make_problem_detail(
msg, 500, 'Internal Server Error')
if CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_lcm_policies.VNFLCM % 'create',
target={'vendor': vnfd.vnf_provider})
try:
# get default vim information
vim_client_obj = vim_client.VimClient()
@ -489,6 +494,17 @@ class VnfLcmController(wsgi.Controller):
try:
vnf_instance.create()
vim_conn = objects.VimConnectionInfo(
id=uuidutils.generate_uuid(),
vim_id=default_vim.get('vim_id'),
vim_type=default_vim.get('vim_type'),
access_info={},
interface_info={},
extra=default_vim.get('extra', {})
)
vnf_instance.vim_connection_info = [vim_conn]
vnf_instance.save()
# create entry to 'vnf' table and 'vnf_attribute' table
attributes = {'placement_attr': default_vim.
get('placement_attr', {})}
@ -567,12 +583,36 @@ class VnfLcmController(wsgi.Controller):
LOG.debug('Old nextpages are deleted. id: %s' % k)
nextpages.pop(k)
def _get_policy_target(self, vnf_instance):
vendor = vnf_instance.vnf_provider
area = None
if vnf_instance.vim_connection_info:
for connection_info in vnf_instance.vim_connection_info:
area = connection_info.extra.get('area')
if area:
break
namespace = None
if vnf_instance.vnf_metadata:
namespace = vnf_instance.vnf_metadata.get('namespace')
target = {
'vendor': vendor,
'area': area,
'namespace': namespace
}
return target
@wsgi.response(http_client.OK)
@wsgi.expected_errors((http_client.FORBIDDEN, http_client.NOT_FOUND))
def show(self, request, id):
context = request.environ['tacker.context']
context.can(vnf_lcm_policies.VNFLCM % 'show')
if not CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_lcm_policies.VNFLCM % 'show')
vnf_instance = self._get_vnf_instance(context, id)
if CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_lcm_policies.VNFLCM % 'show',
target=self._get_policy_target(vnf_instance))
return self._view_builder.show(vnf_instance)
@ -586,8 +626,8 @@ class VnfLcmController(wsgi.Controller):
@api_common.validate_supported_params({'filter', 'nextpage_opaque_marker',
'all_records'})
def index(self, request):
if 'tacker.context' in request.environ:
context = request.environ['tacker.context']
context = request.environ['tacker.context']
if not CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_lcm_policies.VNFLCM % 'index')
filters = request.GET.get('filter')
@ -608,6 +648,13 @@ class VnfLcmController(wsgi.Controller):
vnf_instances = objects.VnfInstanceList.get_by_filters(
request.context, filters=filters)
if CONF.oslo_policy.enhanced_tacker_policy:
vnf_instances = [vnf_instance for vnf_instance in vnf_instances
if context.can(
vnf_lcm_policies.VNFLCM % 'index',
target=self._get_policy_target(vnf_instance),
fatal=False)]
result = self._view_builder.index(vnf_instances)
res = webob.Response(content_type='application/json')
@ -653,6 +700,10 @@ class VnfLcmController(wsgi.Controller):
context = request.environ['tacker.context']
vnf_instance = self._get_vnf_instance(context, id)
if CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_lcm_policies.VNFLCM % 'delete',
target=self._get_policy_target(vnf_instance))
vnf = self._get_vnf(context, id)
self._delete(context, vnf_instance, vnf)
@ -686,7 +737,12 @@ class VnfLcmController(wsgi.Controller):
req_body, context=context)
# validate the vim connection id passed through request is exist or not
self._validate_vim_connection(context, instantiate_vnf_request)
vim_res = self._validate_vim_connection(
context, instantiate_vnf_request)
if instantiate_vnf_request.vim_connection_info:
instantiate_vnf_request.vim_connection_info[0].extra = vim_res.get(
'extra', {})
vnf_instance.task_state = fields.VnfInstanceTaskState.INSTANTIATING
vnf_instance.save()
@ -718,10 +774,14 @@ class VnfLcmController(wsgi.Controller):
@validation.schema(vnf_lcm.instantiate)
def instantiate(self, request, id, body):
context = request.environ['tacker.context']
context.can(vnf_lcm_policies.VNFLCM % 'instantiate')
if not CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_lcm_policies.VNFLCM % 'instantiate')
vnf = self._get_vnf(context, id)
vnf_instance = self._get_vnf_instance(context, id)
if CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_lcm_policies.VNFLCM % 'instantiate',
target=self._get_policy_target(vnf_instance))
return self._instantiate(context, vnf_instance, vnf, body)
@ -769,10 +829,14 @@ class VnfLcmController(wsgi.Controller):
@validation.schema(vnf_lcm.terminate)
def terminate(self, request, id, body):
context = request.environ['tacker.context']
context.can(vnf_lcm_policies.VNFLCM % 'terminate')
if not CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_lcm_policies.VNFLCM % 'terminate')
vnf = self._get_vnf(context, id)
vnf_instance = self._get_vnf_instance(context, id)
if CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_lcm_policies.VNFLCM % 'terminate',
target=self._get_policy_target(vnf_instance))
return self._terminate(context, vnf_instance, vnf, body)
@check_vnf_status_and_error_point(action="heal",
@ -825,10 +889,14 @@ class VnfLcmController(wsgi.Controller):
@validation.schema(vnf_lcm.heal)
def heal(self, request, id, body):
context = request.environ['tacker.context']
context.can(vnf_lcm_policies.VNFLCM % 'heal')
if not CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_lcm_policies.VNFLCM % 'heal')
vnf = self._get_vnf(context, id)
vnf_instance = self._get_vnf_instance(context, id)
if CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_lcm_policies.VNFLCM % 'heal',
target=self._get_policy_target(vnf_instance))
if vnf_instance.instantiation_state not in \
[fields.VnfInstanceState.INSTANTIATED]:
@ -868,7 +936,12 @@ class VnfLcmController(wsgi.Controller):
@wsgi.expected_errors((http_client.FORBIDDEN, http_client.NOT_FOUND))
def update(self, request, id, body):
context = request.environ['tacker.context']
context.can(vnf_lcm_policies.VNFLCM % 'update_vnf')
if CONF.oslo_policy.enhanced_tacker_policy:
vnf_instance = self._get_vnf_instance(context, id)
context.can(vnf_lcm_policies.VNFLCM % 'update_vnf',
target=self._get_policy_target(vnf_instance))
else:
context.can(vnf_lcm_policies.VNFLCM % 'update_vnf')
# get body
body_data = {}
@ -1356,7 +1429,8 @@ class VnfLcmController(wsgi.Controller):
http_client.NOT_FOUND, http_client.CONFLICT))
def scale(self, request, id, body):
context = request.environ['tacker.context']
context.can(vnf_lcm_policies.VNFLCM % 'scale')
if not CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_lcm_policies.VNFLCM % 'scale')
try:
vnf_info = self._vnfm_plugin.get_vnf(context, id)
@ -1364,6 +1438,9 @@ class VnfLcmController(wsgi.Controller):
return self._make_problem_detail(
'VNF IS NOT ACTIVE', 409, title='VNF IS NOT ACTIVE')
vnf_instance = self._get_vnf_instance(context, id)
if CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_lcm_policies.VNFLCM % 'scale',
target=self._get_policy_target(vnf_instance))
if not vnf_instance.instantiated_vnf_info.scale_status:
return self._make_problem_detail(
'NOT SCALE VNF', 409, title='NOT SCALE VNF')
@ -1374,6 +1451,8 @@ class VnfLcmController(wsgi.Controller):
except webob.exc.HTTPNotFound as inst_e:
return self._make_problem_detail(
str(inst_e), 404, title='VNF NOT FOUND')
except exceptions.PolicyNotAuthorized:
raise
except Exception as e:
LOG.error(traceback.format_exc())
return self._make_problem_detail(
@ -1831,10 +1910,14 @@ class VnfLcmController(wsgi.Controller):
@validation.schema(vnf_lcm.change_ext_conn)
def change_ext_conn(self, request, id, body):
context = request.environ['tacker.context']
context.can(vnf_lcm_policies.VNFLCM % 'change_ext_conn')
if not CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_lcm_policies.VNFLCM % 'change_ext_conn')
vnf = self._get_vnf(context, id)
vnf_instance = self._get_vnf_instance(context, id)
if CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_lcm_policies.VNFLCM % 'change_ext_conn',
target=self._get_policy_target(vnf_instance))
if (vnf_instance.instantiation_state !=
fields.VnfInstanceState.INSTANTIATED):
return self._make_problem_detail(

View File

@ -109,7 +109,8 @@ class VnfPkgmController(wsgi.Controller):
@wsgi.expected_errors((http_client.FORBIDDEN, http_client.NOT_FOUND))
def show(self, request, id):
context = request.environ['tacker.context']
context.can(vnf_package_policies.VNFPKGM % 'show')
if not CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_package_policies.VNFPKGM % 'show')
# check if id is of type uuid format
if not uuidutils.is_uuid_like(id):
@ -120,18 +121,28 @@ class VnfPkgmController(wsgi.Controller):
vnf_package = vnf_package_obj.VnfPackage.get_by_id(
request.context, id, expected_attrs=[
"vnf_deployment_flavours", "vnfd", "vnf_artifacts"])
if CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_package_policies.VNFPKGM % 'show',
target=self._get_policy_target(vnf_package))
except exceptions.VnfPackageNotFound:
msg = _("Can not find requested vnf package: %s") % id
raise webob.exc.HTTPNotFound(explanation=msg)
return self._view_builder.show(vnf_package)
def _get_policy_target(self, vnf_package):
vendor = None
if vnf_package.obj_attr_is_set('vnfd') and vnf_package.vnfd:
vendor = vnf_package.vnfd.get('vnf_provider')
return {'vendor': vendor}
@wsgi.response(http_client.OK)
@wsgi.expected_errors((http_client.BAD_REQUEST, http_client.FORBIDDEN))
@validation.query_schema(vnf_packages.query_params_v1)
def index(self, request):
if 'tacker.context' in request.environ:
context = request.environ['tacker.context']
context = request.environ['tacker.context']
if not CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_package_policies.VNFPKGM % 'index')
search_opts = {}
@ -173,6 +184,13 @@ class VnfPkgmController(wsgi.Controller):
vnf_packages = vnf_package_obj.VnfPackagesList.get_by_filters(
request.context, read_deleted='no', filters=filters)
if CONF.oslo_policy.enhanced_tacker_policy:
vnf_packages = [vnf_package for vnf_package in vnf_packages
if context.can(
vnf_package_policies.VNFPKGM % 'index',
target=self._get_policy_target(vnf_package),
fatal=False)]
results = self._view_builder.index(vnf_packages,
all_fields=all_fields,
exclude_fields=exclude_fields,
@ -206,9 +224,13 @@ class VnfPkgmController(wsgi.Controller):
http_client.CONFLICT))
def delete(self, request, id):
context = request.environ['tacker.context']
context.can(vnf_package_policies.VNFPKGM % 'delete')
if not CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_package_policies.VNFPKGM % 'delete')
vnf_package = self._get_vnf_package(id, request)
if CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_package_policies.VNFPKGM % 'delete',
target=self._get_policy_target(vnf_package))
if (vnf_package.operational_state ==
fields.PackageOperationalStateType.ENABLED or
@ -232,9 +254,13 @@ class VnfPkgmController(wsgi.Controller):
http_client.REQUESTED_RANGE_NOT_SATISFIABLE))
def fetch_vnf_package_content(self, request, id):
context = request.environ['tacker.context']
context.can(vnf_package_policies.VNFPKGM % 'fetch_package_content')
if not CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_package_policies.VNFPKGM % 'fetch_package_content')
vnf_package = self._get_vnf_package(id, request)
if CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_package_policies.VNFPKGM % 'fetch_package_content',
target=self._get_policy_target(vnf_package))
if vnf_package.onboarding_state != \
fields.PackageOnboardingStateType.ONBOARDED:
@ -463,9 +489,14 @@ class VnfPkgmController(wsgi.Controller):
@validation.schema(vnf_packages.patch)
def patch(self, request, id, body):
context = request.environ['tacker.context']
context.can(vnf_package_policies.VNFPKGM % 'patch')
if not CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_package_policies.VNFPKGM % 'patch')
old_vnf_package = self._get_vnf_package(id, request)
if CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_package_policies.VNFPKGM % 'patch',
target=self._get_policy_target(old_vnf_package))
vnf_package = old_vnf_package.obj_clone()
user_data = body.get('userDefinedData')
@ -517,7 +548,8 @@ class VnfPkgmController(wsgi.Controller):
http_client.INTERNAL_SERVER_ERROR))
def get_vnf_package_vnfd(self, request, id):
context = request.environ['tacker.context']
context.can(vnf_package_policies.VNFPKGM % 'get_vnf_package_vnfd')
if not CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_package_policies.VNFPKGM % 'get_vnf_package_vnfd')
valid_accept_headers = ['application/zip', 'text/plain']
accept_headers = request.headers['Accept'].split(',')
@ -531,6 +563,9 @@ class VnfPkgmController(wsgi.Controller):
valid_accept_headers)})
vnf_package = self._get_vnf_package(id, request)
if CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_package_policies.VNFPKGM % 'get_vnf_package_vnfd',
target=self._get_policy_target(vnf_package))
if vnf_package.onboarding_state != \
fields.PackageOnboardingStateType.ONBOARDED:
@ -574,7 +609,8 @@ class VnfPkgmController(wsgi.Controller):
def fetch_vnf_package_artifacts(self, request, id, artifact_path):
context = request.environ['tacker.context']
# get policy
context.can(vnf_package_policies.VNFPKGM % 'fetch_artifact')
if not CONF.oslo_policy.enhanced_tacker_policy:
context.can(vnf_package_policies.VNFPKGM % 'fetch_artifact')
# get vnf_package
if not uuidutils.is_uuid_like(id):
@ -585,6 +621,10 @@ class VnfPkgmController(wsgi.Controller):
vnf_package = vnf_package_obj.VnfPackage.get_by_id(
request.context, id,
expected_attrs=["vnf_artifacts"])
if CONF.oslo_policy.enhanced_tacker_policy:
# get policy
context.can(vnf_package_policies.VNFPKGM % 'fetch_artifact',
target=self._get_policy_target(vnf_package))
except exceptions.VnfPackageNotFound:
msg = _("Can not find requested vnf package: %s") % id
raise webob.exc.HTTPNotFound(explanation=msg)

View File

@ -174,6 +174,24 @@ def is_valid_ipv4(address):
return False
def is_valid_area(area):
"""Verify that the area attribute is valid.
Area attribute is an area-region pair. The value of this attribute should
be a string in the format of "area@region".
"""
if not area:
return False
if not isinstance(area, str):
return False
if '@' not in area:
return False
split_area = area.split('@')
if len(split_area) != 2:
return False
return split_area[0] and split_area[1]
def change_memory_unit(mem, to):
"""Change the memory value(mem) based on the unit('to') specified.

View File

@ -18,6 +18,7 @@ from oslo_config import cfg
from tacker.conf import conductor
from tacker.conf import coordination
from tacker.conf import policy
from tacker.conf import vnf_lcm
from tacker.conf import vnf_package
@ -29,3 +30,4 @@ vnf_lcm.register_opts(CONF)
conductor.register_opts(CONF)
coordination.register_opts(CONF)
glance_store.register_opts(CONF)
policy.register_opts(CONF)

34
tacker/conf/policy.py Normal file
View File

@ -0,0 +1,34 @@
# Copyright (C) 2023 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
CONF = cfg.CONF
policy_opts = [
cfg.BoolOpt('enhanced_tacker_policy',
default=False,
help=_('Seconds between running periodic tasks '
'to cleanup residues of deleted vnf packages')),
]
def register_opts(conf):
conf.register_opts(policy_opts, group='oslo_policy')
def list_opts():
return {'oslo_policy': policy_opts}

View File

@ -148,11 +148,14 @@ class ContextBase(oslo_context.RequestContext):
:return: returns a non-False value (not necessarily "True") if
authorized and False if not authorized and fatal is False.
"""
if target is None:
target = {'project_id': self.tenant_id,
'user_id': self.user_id}
tgt = {
'project_id': self.tenant_id,
'user_id': self.user_id
}
if target is not None:
tgt.update(target)
try:
return policy.authorize(self, action, target)
return policy.authorize(self, action, tgt)
except exceptions.Forbidden:
if fatal:
raise

View File

@ -14,6 +14,7 @@
# under the License.
from collections import abc
import copy
import re
import sys
@ -28,6 +29,7 @@ from oslo_utils import importutils
from tacker._i18n import _
from tacker.api.v1 import attributes
from tacker.common import exceptions
from tacker.common.utils import is_valid_area
from tacker import policies
@ -60,10 +62,134 @@ def init(conf=cfg.CONF, policy_file=None):
_ENFORCER.load_rules()
def _pre_enhanced_policy_check(target, credentials):
"""Preprocesses target and credentials for enhanced tacker policy.
This method does the following things:
1) If an attribute of target is FALSE, assign '*' to this attribute.
Example::
target = {
'area': None,
'vendor': ''
}
after preprocess:
target = {
'area': '*',
'vendor': '*'
}
2) Convert special roles to enhanced policy attributes in credentials.
Example::
credentials = {
'roles': [
'AREA_area_A@region_A',
'VENDOR_company_A',
'NAMESPACE_default'
]
}
after preprocess:
credentials = {
'roles': [
'AREA_area_A@region_A',
'VENDOR_company_A',
'NAMESPACE_default'
],
'area': ['*', 'area_A@region_A'],
'vendor': ['*', 'company_A'],
'namespace: ['*', 'default']
}
3) Convert special value `all` to the corresponding attribute value in
target.
"""
if not cfg.CONF.oslo_policy.enhanced_tacker_policy:
return target, credentials
if target is None:
tgt = {}
else:
tgt = copy.copy(target)
# If an attribute of target is FALSE, assign '*' to this attribute.
for k, v in tgt.items():
if not v:
tgt[k] = '*'
convert_map = {
'AREA_': 'area',
'VENDOR_': 'vendor',
'NAMESPACE_': 'namespace'
}
user_attrs = {
'area': ['*'],
'vendor': ['*'],
'namespace': ['*']
}
# Convert special roles to enhanced policy attributes in credentials.
for role in credentials.get('roles'):
for prefix, key in convert_map.items():
if role.startswith(prefix):
attr = role[len(prefix):]
if attr:
user_attrs[key].append(attr)
common_keys = user_attrs.keys() & tgt.keys()
# Convert special value `all` to the corresponding attribute value in
# target.
for key in common_keys:
attrs = user_attrs.get(key)
if tgt.get(key) == '*':
continue
to_remove = []
if 'area' == key:
if not is_valid_area(tgt.get(key)):
continue
for attr in attrs:
if not is_valid_area(attr):
continue
if 'all@all' == attr:
# example:
# target = {'area': 'area_A@region_A'}
# then:
# 'all@all' -> 'area_A@region_A'
to_remove.append('all@all')
attrs.append(tgt.get(key))
elif attr.startswith('all@'):
to_remove.append(attr)
area, region = attr.split('@', 1)
t_area, t_region = tgt.get(key).split('@', 1)
if region == t_region:
# example:
# target = {'area': 'area_A@region_A'}
# then:
# 'all@region_A' -> 'area_A@region_A'
attrs.append(f'{t_area}@{region}')
# else:
# example:
# target = {'area': 'area_A@region_B'}
# then:
# 'all@region_A' -> to be removed.
else:
for attr in attrs:
if 'all' == attr:
# example:
# target = {'vendor': 'company_A'}
# then:
# 'all' -> 'company_A'
to_remove.append('all')
attrs.append(tgt.get(key))
for item in to_remove:
attrs.remove(item)
user_attrs.update(credentials)
return tgt, user_attrs
def authorize(context, action, target, do_raise=True, exc=None):
init()
credentials = context.to_policy_values()
target, credentials = _pre_enhanced_policy_check(target, credentials)
if not exc:
exc = exceptions.PolicyNotAuthorized
try:
@ -387,6 +513,11 @@ def check(context, action, target, plugin=None, might_not_exist=False,
action,
target,
pluralized)
target = copy.copy(target)
if 'area' not in target:
target.update({'area': target.get('extra', {}).get('area')})
target, credentials = _pre_enhanced_policy_check(target, credentials)
result = _ENFORCER.enforce(match_rule,
target,
credentials,
@ -422,6 +553,10 @@ def enforce(context, action, target, plugin=None, pluralized=None):
action,
target,
pluralized)
target = copy.copy(target)
if 'area' not in target:
target.update({'area': target.get('extra', {}).get('area')})
target, credentials = _pre_enhanced_policy_check(target, credentials)
try:
result = _ENFORCER.enforce(rule, target, credentials, action=action,
do_raise=True)

View File

@ -36,6 +36,20 @@ VNF_LCM_OP_OCCS_PATH = V2_PATH + '/vnf_lcm_op_occs'
VNF_LCM_OP_OCCS_ID_PATH = VNF_LCM_OP_OCCS_PATH + '/{vnfLcmOpOccId}'
SERVER_NOTIFICATION_PATH = CONF.server_notification.uri_path_prefix
DELAY_CHECK_POLICY_NAMES = (
POLICY_NAME.format('create'),
POLICY_NAME.format('index'),
POLICY_NAME.format('show'),
POLICY_NAME.format('delete'),
POLICY_NAME.format('update'),
POLICY_NAME.format('instantiate'),
POLICY_NAME.format('terminate'),
POLICY_NAME.format('scale'),
POLICY_NAME.format('heal'),
POLICY_NAME.format('change_ext_conn'),
POLICY_NAME.format('change_vnfpkg'),
)
rules = [
policy.DocumentedRuleDefault(
name=POLICY_NAME.format('api_versions'),

View File

@ -24,6 +24,8 @@ from tacker.common import exceptions as common_ex
from tacker import context
from tacker.sol_refactored.api import api_version
from tacker.sol_refactored.api.policies.vnflcm_v2 \
import DELAY_CHECK_POLICY_NAMES
from tacker.sol_refactored.common import config
from tacker.sol_refactored.common import exceptions as sol_ex
@ -159,7 +161,9 @@ class SolResource(object):
return
if action == 'reject':
return
request.context.can(self.policy_name.format(action))
if not (self.policy_name.format(action) in DELAY_CHECK_POLICY_NAMES
and config.CONF.oslo_policy.enhanced_tacker_policy):
request.context.can(self.policy_name.format(action))
def _dispatch(self, request, action, action_args):
controller_method = getattr(self.controller, action)

View File

@ -48,6 +48,7 @@ def vim_to_conn_info(vim):
region = vim['placement_attr']['regions'][0]
vim_auth = vim['vim_auth']
extra = vim.get('extra', {})
if vim['vim_type'] == "openstack":
# see. https://nfvwiki.etsi.org/index.php
@ -73,7 +74,8 @@ def vim_to_conn_info(vim):
vimId=vim['vim_id'],
vimType='ETSINFV.OPENSTACK_KEYSTONE.V_3',
interfaceInfo=interface_info,
accessInfo=access_info
accessInfo=access_info,
extra=extra
)
if vim['vim_type'] == "kubernetes": # k8s
if 'oidc_token_url' in vim_auth:
@ -107,7 +109,8 @@ def vim_to_conn_info(vim):
vimId=vim['vim_id'],
vimType=vim_type,
interfaceInfo=interface_info,
accessInfo=access_info
accessInfo=access_info,
extra=extra
)
raise sol_ex.SolException(sol_detail='not support vim type')

View File

@ -20,6 +20,7 @@ from oslo_log import log as logging
from oslo_utils import uuidutils
from tacker.sol_refactored.api import api_version
from tacker.sol_refactored.api.policies.vnflcm_v2 import POLICY_NAME
from tacker.sol_refactored.api.schemas import vnflcm_v2 as schema
from tacker.sol_refactored.api import validator
from tacker.sol_refactored.api import wsgi as sol_wsgi
@ -62,6 +63,10 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
pkg_info = self.nfvo_client.get_vnf_package_info_vnfd(
context, vnfd_id)
if config.CONF.oslo_policy.enhanced_tacker_policy:
context.can(POLICY_NAME.format('create'),
target={'vendor': pkg_info.vnfProvider})
if pkg_info.operationalState != "ENABLED":
raise sol_ex.VnfdIdNotEnabled(vnfd_id=vnfd_id)
@ -122,6 +127,11 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
insts = inst_utils.get_inst_all(request.context,
marker=pager.marker)
if config.CONF.oslo_policy.enhanced_tacker_policy:
insts = [inst for inst in insts if request.context.can(
POLICY_NAME.format('index'),
target=self._get_policy_target(inst),
fatal=False)]
resp_body = self._inst_view.detail_list(insts, filters,
selector, pager)
@ -130,7 +140,9 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
def show(self, request, id):
inst = inst_utils.get_inst(request.context, id)
if config.CONF.oslo_policy.enhanced_tacker_policy:
request.context.can(POLICY_NAME.format('show'),
target=self._get_policy_target(inst))
resp_body = self._inst_view.detail(inst)
return sol_wsgi.SolResponse(200, resp_body)
@ -139,7 +151,9 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
def delete(self, request, id):
context = request.context
inst = inst_utils.get_inst(context, id)
if config.CONF.oslo_policy.enhanced_tacker_policy:
context.can(POLICY_NAME.format('delete'),
target=self._get_policy_target(inst))
if inst.instantiationState != 'NOT_INSTANTIATED':
raise sol_ex.VnfInstanceIsInstantiated(inst_id=id)
@ -174,7 +188,9 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
def update(self, request, id, body):
context = request.context
inst = inst_utils.get_inst(context, id)
if config.CONF.oslo_policy.enhanced_tacker_policy:
context.can(POLICY_NAME.format('update'),
target=self._get_policy_target(inst))
lcmocc_utils.check_lcmocc_in_progress(context, id)
if (inst.instantiationState == 'NOT_INSTANTIATED'
and 'vimConnectionInfo' in body):
@ -215,11 +231,35 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
return sol_wsgi.SolResponse(202, None, location=location)
def _get_policy_target(self, vnf_instance):
vendor = vnf_instance.vnfProvider
area = None
if vnf_instance.obj_attr_is_set('vimConnectionInfo'):
for _, vim_conn_info in vnf_instance.vimConnectionInfo.items():
area = vim_conn_info.get('extra', {}).get('area')
if area:
break
namespace = None
if vnf_instance.obj_attr_is_set('metadata'):
namespace = vnf_instance.metadata.get('namespace')
target = {
'vendor': vendor,
'area': area,
'namespace': namespace
}
return target
@validator.schema(schema.InstantiateVnfRequest_V200, '2.0.0')
@coordinate.lock_vnf_instance('{id}')
def instantiate(self, request, id, body):
context = request.context
inst = inst_utils.get_inst(context, id)
if config.CONF.oslo_policy.enhanced_tacker_policy:
context.can(POLICY_NAME.format('instantiate'),
target=self._get_policy_target(inst))
if inst.instantiationState != 'NOT_INSTANTIATED':
raise sol_ex.VnfInstanceIsInstantiated(inst_id=id)
@ -254,6 +294,9 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
def terminate(self, request, id, body):
context = request.context
inst = inst_utils.get_inst(context, id)
if config.CONF.oslo_policy.enhanced_tacker_policy:
context.can(POLICY_NAME.format('terminate'),
target=self._get_policy_target(inst))
if inst.instantiationState != 'INSTANTIATED':
raise sol_ex.VnfInstanceIsNotInstantiated(inst_id=id)
@ -289,6 +332,9 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
def scale(self, request, id, body):
context = request.context
inst = inst_utils.get_inst(context, id)
if config.CONF.oslo_policy.enhanced_tacker_policy:
context.can(POLICY_NAME.format('scale'),
target=self._get_policy_target(inst))
if inst.instantiationState != 'INSTANTIATED':
raise sol_ex.VnfInstanceIsNotInstantiated(inst_id=id)
@ -330,6 +376,9 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
def heal(self, request, id, body):
context = request.context
inst = inst_utils.get_inst(context, id)
if config.CONF.oslo_policy.enhanced_tacker_policy:
context.can(POLICY_NAME.format('heal'),
target=self._get_policy_target(inst))
if inst.instantiationState != 'INSTANTIATED':
raise sol_ex.VnfInstanceIsNotInstantiated(inst_id=id)
@ -367,6 +416,9 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
def change_ext_conn(self, request, id, body):
context = request.context
inst = inst_utils.get_inst(context, id)
if config.CONF.oslo_policy.enhanced_tacker_policy:
context.can(POLICY_NAME.format('change_ext_conn'),
target=self._get_policy_target(inst))
if inst.instantiationState != 'INSTANTIATED':
raise sol_ex.VnfInstanceIsNotInstantiated(inst_id=id)
@ -425,6 +477,9 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
def change_vnfpkg(self, request, id, body):
context = request.context
inst = inst_utils.get_inst(context, id)
if config.CONF.oslo_policy.enhanced_tacker_policy:
context.can(POLICY_NAME.format('change_vnfpkg'),
target=self._get_policy_target(inst))
vnfd_id = body['vnfdId']
if inst.instantiationState != 'INSTANTIATED':

View File

@ -13,13 +13,19 @@
# License for the specific language governing permissions and limitations
# under the License.
import ddt
from oslo_config import cfg
from unittest import mock
from tacker import context
from tacker.sol_refactored.api.policies.vnflcm_v2 \
import POLICY_NAME
from tacker.sol_refactored.api import wsgi as sol_wsgi
from tacker.sol_refactored.common import exceptions as sol_ex
from tacker.tests.unit import base
@ddt.ddt
class TestWsgi(base.TestCase):
def test_response_too_big(self):
@ -46,3 +52,88 @@ class TestWsgi(base.TestCase):
request.headers = {}
self.assertRaises(sol_ex.APIVersionMissing,
resource._check_api_version, request, 'action')
@mock.patch.object(context.Context, 'can')
def test_delay_policy_check(self, mock_can):
cfg.CONF.set_override(
"enhanced_tacker_policy", True, group='oslo_policy')
resource = sol_wsgi.SolResource(sol_wsgi.SolAPIController(),
policy_name=POLICY_NAME)
request = mock.Mock()
request.context = context.Context()
request.headers = {}
delay_actions = [
'create',
'index',
'show',
'delete',
'update',
'instantiate',
'terminate',
'scale',
'heal',
'change_ext_conn',
'change_vnfpkg'
]
for action in delay_actions:
resource._check_policy(request, action)
mock_can.assert_not_called()
@ddt.data('api_versions',
'subscription_create',
'subscription_list',
'subscription_show',
'subscription_delete',
'lcm_op_occ_list',
'lcm_op_occ_show',
'lcm_op_occ_retry',
'lcm_op_occ_rollback',
'lcm_op_occ_fail',
'lcm_op_occ_delete', )
@mock.patch.object(context.Context, 'can')
def test_not_delay_policy_check(self, action, mock_can):
cfg.CONF.set_override(
"enhanced_tacker_policy", True, group='oslo_policy')
resource = sol_wsgi.SolResource(sol_wsgi.SolAPIController(),
policy_name=POLICY_NAME)
request = mock.Mock()
request.context = context.Context()
request.headers = {}
resource._check_policy(request, action)
mock_can.assert_called_once()
@ddt.data('create',
'index',
'show',
'delete',
'update',
'instantiate',
'terminate',
'scale',
'heal',
'change_ext_conn',
'change_vnfpkg',
'api_versions',
'subscription_create',
'subscription_list',
'subscription_show',
'subscription_delete',
'lcm_op_occ_list',
'lcm_op_occ_show',
'lcm_op_occ_retry',
'lcm_op_occ_rollback',
'lcm_op_occ_fail',
'lcm_op_occ_delete', )
@mock.patch.object(context.Context, 'can')
def test_enhanced_policy_is_false(self, action, mock_can):
cfg.CONF.set_override(
"enhanced_tacker_policy", False, group='oslo_policy')
resource = sol_wsgi.SolResource(sol_wsgi.SolAPIController(),
policy_name=POLICY_NAME)
request = mock.Mock()
request.context = context.Context()
request.headers = {}
resource._check_policy(request, action)
mock_can.assert_called_once()

View File

@ -12,14 +12,21 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from datetime import datetime
import ddt
from http import client as http_client
import requests
from unittest import mock
from oslo_policy import policy as oslo_policy
from oslo_utils import uuidutils
from tacker.common.exceptions import PolicyNotAuthorized
from tacker import context
from tacker import policy
from tacker.sol_refactored.api import api_version
from tacker.sol_refactored.api.policies.vnflcm_v2 import POLICY_NAME
from tacker.sol_refactored.common import config
from tacker.sol_refactored.common import exceptions as sol_ex
from tacker.sol_refactored.common import lcm_op_occ_utils as lcmocc_utils
@ -35,6 +42,9 @@ from tacker.sol_refactored.objects.v2 import fields
from tacker.tests.unit.db import base as db_base
objects.register_all()
_change_ext_conn_req_example = {
"extVirtualLinks": [
{
@ -111,6 +121,354 @@ SAMPLE_VNFD_ID = "b1bb0ce7-ebca-4fa7-95ed-4840d7000000"
CONF = config.CONF
def get_test_data_policy_instantiate():
rules = {POLICY_NAME.format('instantiate'): "vendor:%(vendor)s"}
test_data = [
# 'expected_status_code': http_client.ACCEPTED
{
'vnf_instance_updates': {'vnfProvider': 'provider_A'},
'rules': rules,
'roles': ['VENDOR_provider_A'],
'expected_status_code': http_client.ACCEPTED
},
{
'vnf_instance_updates': {'vnfProvider': 'provider_A'},
'rules': rules,
'roles': ['VENDOR_all'],
'expected_status_code': http_client.ACCEPTED
},
# 'expected_status_code': http_client.FORBIDDEN
{
'vnf_instance_updates': {'vnfProvider': 'provider_A'},
'rules': rules,
'roles': [],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': {'vnfProvider': 'provider_A'},
'rules': rules,
'roles': ['VENDOR_provider_B'],
'expected_status_code': http_client.FORBIDDEN
},
]
return test_data
def get_test_data_policy_vnf_instantiated(action, success_status_code):
vim_connection_info_area_a_region_a = objects.VimConnectionInfo(
id='f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
vimId='f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
vimType='openstack',
access_info={"key1": 'value1', "key2": 'value2'},
extra={'area': 'area_A@region_A'}
)
vim_connection_info_without_area = objects.VimConnectionInfo(
id='f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
vimId='f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
vimType='openstack',
access_info={"key1": 'value1', "key2": 'value2'}
)
vnf_instance_updates = {
'vnfProvider': 'provider_A',
'metadata': {"namespace": "namespace_A"},
'vimConnectionInfo': {'vim1': vim_connection_info_area_a_region_a}
}
vnf_instance_updates_without_namespace = {
'vnfProvider': 'provider_A',
'vimConnectionInfo': {'vim1': vim_connection_info_area_a_region_a}
}
vnf_instance_updates_without_area = {
'vnfProvider': 'provider_A',
'metadata': {"namespace": "namespace_A"},
'vimConnectionInfo': {'vim1': vim_connection_info_without_area}
}
vnf_instance_updates_without_area_namespace = {
'vnfProvider': 'provider_A',
'vimConnectionInfo': {'vim1': vim_connection_info_without_area}
}
rule_area_vendor_namespace = {
POLICY_NAME.format(action):
"area:%(area)s and vendor:%(vendor)s and "
"namespace:%(namespace)s"
}
rule_vendor = {
POLICY_NAME.format(action): "vendor:%(vendor)s"
}
test_data = [
# 'expected_status_code': success_status_code
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_area_A@region_A',
'VENDOR_provider_A',
'NAMESPACE_namespace_A'
],
'expected_status_code': success_status_code
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_all@all', 'VENDOR_all', 'NAMESPACE_all'
],
'expected_status_code': success_status_code
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_all@region_A', 'VENDOR_all', 'NAMESPACE_all'
],
'expected_status_code': success_status_code
},
{
'vnf_instance_updates': vnf_instance_updates_without_namespace,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_area_A@region_A',
'VENDOR_provider_A',
'NAMESPACE_namespace_A'
],
'expected_status_code': success_status_code
},
{
'vnf_instance_updates': vnf_instance_updates_without_area,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_area_A@region_A',
'VENDOR_provider_A',
'NAMESPACE_namespace_A'
],
'expected_status_code': success_status_code
},
{
'vnf_instance_updates':
vnf_instance_updates_without_area_namespace,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_area_A@region_A',
'VENDOR_provider_A',
'NAMESPACE_namespace_A'
],
'expected_status_code': success_status_code
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_vendor,
'roles': [
'AREA_area_A@region_A',
'VENDOR_provider_A',
'NAMESPACE_namespace_A'
],
'expected_status_code': success_status_code
},
# 'expected_status_code': http_client.FORBIDDEN
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_area_B@region_A',
'VENDOR_provider_A',
'NAMESPACE_namespace_A'
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_area_A@region_A',
'VENDOR_provider_B',
'NAMESPACE_namespace_A'
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_area_A@region_A',
'VENDOR_provider_A',
'NAMESPACE_namespace_B'
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [
'VENDOR_provider_A',
'NAMESPACE_namespace_A'
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_area_A@region_A',
'NAMESPACE_namespace_A'
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_area_A@region_A',
'VENDOR_provider_A',
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_all@region_B',
'VENDOR_provider_A',
'NAMESPACE_namespace_A'
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_vendor,
'roles': [
'AREA_area_A@region_A',
'VENDOR_provider_B',
'NAMESPACE_namespace_A'
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_vendor,
'roles': [
'AREA_area_A@region_A',
'NAMESPACE_namespace_A'
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates':
vnf_instance_updates_without_area_namespace,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_area_B@region_A',
'VENDOR_provider_B',
'NAMESPACE_namespace_A'
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates':
vnf_instance_updates_without_area_namespace,
'rules': rule_area_vendor_namespace,
'roles': [
'VENDOR_provider_B',
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates':
vnf_instance_updates_without_area_namespace,
'rules': rule_vendor,
'roles': [
'VENDOR_provider_B',
],
'expected_status_code': http_client.FORBIDDEN
}
]
return test_data
def get_test_data_policy_index():
vim_connection_info_area_a_region_a = objects.VimConnectionInfo(
id='f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
vimId='f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
vimType='openstack',
access_info={"key1": 'value1', "key2": 'value2'},
extra={'area': 'area_A@region_A'}
)
vim_connection_info_area_b_region_a = objects.VimConnectionInfo(
id='f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
vimId='f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
vimType='openstack',
access_info={"key1": 'value1', "key2": 'value2'},
extra={'area': 'area_B@region_A'}
)
vnf_inst_fields = {
'id': uuidutils.generate_uuid(),
'vnfdId': uuidutils.generate_uuid(),
'vnfProductName': 'product name',
'vnfSoftwareVersion': 'software version',
'vnfdVersion': 'vnfd version',
'instantiationState': 'INSTANTIATED'
}
vnf_instance_updates_area_a = {
'id': uuidutils.generate_uuid(),
'vnfProvider': 'provider_A',
'vimConnectionInfo': {'vim1': vim_connection_info_area_a_region_a}
}
vnf_instance_updates_area_a.update(vnf_inst_fields)
vnf_instance_updates_area_b = {
'id': uuidutils.generate_uuid(),
'vnfProvider': 'provider_A',
'vimConnectionInfo': {'vim1': vim_connection_info_area_b_region_a}
}
vnf_instance_updates_area_b.update(vnf_inst_fields)
vnf_instance_updates_provider_a = {
'id': uuidutils.generate_uuid(),
'vnfProvider': 'provider_A',
}
vnf_instance_updates_provider_a.update(vnf_inst_fields)
vnf_instance_updates_provider_b = {
'id': uuidutils.generate_uuid(),
'vnfProvider': 'provider_B',
}
vnf_instance_updates_provider_b.update(vnf_inst_fields)
vnf_instances_a = objects.VnfInstanceV2(**vnf_instance_updates_area_a)
vnf_instances_b = objects.VnfInstanceV2(**vnf_instance_updates_area_b)
vnf_instances_provider_a = objects.VnfInstanceV2(
**vnf_instance_updates_provider_a)
vnf_instances_provider_b = objects.VnfInstanceV2(
**vnf_instance_updates_provider_b)
rule_area_vendor_namespace = {
POLICY_NAME.format('index'):
"area:%(area)s and vendor:%(vendor)s and "
"namespace:%(namespace)s"
}
test_data = [
{
'vnf_instance_list': [vnf_instances_a, vnf_instances_b],
'rules': rule_area_vendor_namespace,
'roles': ['AREA_area_A@region_A', 'VENDOR_provider_A'],
'expected_vnf_inst_ids': [vnf_instances_a.id]
},
{
'vnf_instance_list': [vnf_instances_a, vnf_instances_b],
'rules': rule_area_vendor_namespace,
'roles': ['AREA_all@all', 'VENDOR_provider_A'],
'expected_vnf_inst_ids': [vnf_instances_a.id, vnf_instances_b.id]
},
{
'vnf_instance_list': [
vnf_instances_provider_a, vnf_instances_provider_b],
'rules': rule_area_vendor_namespace,
'roles': ['AREA_all@all', 'VENDOR_provider_A'],
'expected_vnf_inst_ids': [vnf_instances_provider_a.id]
}
]
return test_data
class TestVnflcmV2(db_base.SqlTestCase):
def setUp(self):
@ -1243,3 +1601,476 @@ class TestVnflcmV2(db_base.SqlTestCase):
self.assertRaises(sol_ex.SolValidationError,
self.controller.heal, request=self.request, id=inst_id,
body=body)
@ddt.ddt
class TestVnflcmV2EnhancedPolicy(TestVnflcmV2):
def setUp(self):
super(TestVnflcmV2EnhancedPolicy, self).setUp()
CONF.set_override(
"enhanced_tacker_policy", True, group='oslo_policy')
def _create_inst_and_lcmocc(self, inst_state, op_state,
vnf_inst_updates=None):
inst, lcmocc = self._set_inst_and_lcmocc(
inst_state, op_state, vnf_inst_updates=vnf_inst_updates)
inst.create(self.context)
lcmocc.create(self.context)
return inst.id, lcmocc.id
def _set_inst_and_lcmocc(
self, inst_state, op_state, vnf_inst_updates=None):
vnf_inst_fields = {
# required fields
'id': uuidutils.generate_uuid(),
'vnfdId': uuidutils.generate_uuid(),
'vnfProvider': 'provider',
'vnfProductName': 'product name',
'vnfSoftwareVersion': 'software version',
'vnfdVersion': 'vnfd version',
'instantiationState': inst_state
}
if vnf_inst_updates:
vnf_inst_fields.update(vnf_inst_updates)
inst = objects.VnfInstanceV2(**vnf_inst_fields)
req = {"flavourId": "simple"} # instantiate request
lcmocc = objects.VnfLcmOpOccV2(
# required fields
id=uuidutils.generate_uuid(),
operationState=op_state,
stateEnteredTime=datetime.utcnow(),
startTime=datetime.utcnow(),
vnfInstanceId=inst.id,
operation=fields.LcmOperationType.INSTANTIATE,
isAutomaticInvocation=False,
isCancelPending=False,
operationParams=req
)
return inst, lcmocc
def _prepare_db_for_scale_param_check(self, scale_status,
max_scale_levels, vnf_inst_updates=None):
vnf_inst_fields = {
# required fields
'id': uuidutils.generate_uuid(),
'vnfdId': uuidutils.generate_uuid(),
'vnfProvider': 'provider',
'vnfProductName': 'product name',
'vnfSoftwareVersion': 'software version',
'vnfdVersion': 'vnfd version',
'instantiationState': 'INSTANTIATED'
}
if vnf_inst_updates:
vnf_inst_fields.update(vnf_inst_updates)
inst = objects.VnfInstanceV2(**vnf_inst_fields)
inst.instantiatedVnfInfo = objects.VnfInstanceV2_InstantiatedVnfInfo(
flavourId='small',
vnfState='STARTED',
scaleStatus=scale_status,
maxScaleLevels=max_scale_levels
)
inst.create(self.context)
return inst.id
def _prepare_db_for_change_vnfpkg_param(self, vnf_inst_updates=None):
vnf_inst_fields = {
# required fields
'id': uuidutils.generate_uuid(),
'vnfdId': uuidutils.generate_uuid(),
'vnfProvider': 'provider',
'vnfProductName': 'product name',
'vnfSoftwareVersion': 'software version',
'vnfdVersion': 'vnfd version',
'instantiationState': 'INSTANTIATED'
}
if vnf_inst_updates:
vnf_inst_fields.update(vnf_inst_updates)
inst = objects.VnfInstanceV2(**vnf_inst_fields)
inst.instantiatedVnfInfo = objects.VnfInstanceV2_InstantiatedVnfInfo(
flavourId='small',
vnfState='STARTED',
vnfcResourceInfo=[
objects.VnfcResourceInfoV2(
id="VDU1-vnfc_res_info_id_VDU1",
vduId="VDU1"
)
]
)
inst.vimConnectionInfo = {
"vim1": objects.VimConnectionInfo.from_dict(
_vim_connection_info_example)}
inst.create(self.context)
return inst.id
def _fake_request(self, roles):
request = requests.Request()
ctx = context.Context('fake', 'fake', roles=roles)
ctx.api_version = api_version.APIVersion("2.0.0")
request.context = ctx
return request
def _overwrite_policy(self, rules):
policy.set_rules(oslo_policy.Rules.from_dict(rules), overwrite=True)
@mock.patch.object(inst_utils, 'get_inst_all')
def test_index(self, mock_inst):
request = requests.Request()
request.context = self.context
request.GET = {'filter': f'(eq,vnfdId,{SAMPLE_VNFD_ID})'}
mock_inst.return_value = [objects.VnfInstanceV2(
id='inst-1', vnfdId=SAMPLE_VNFD_ID, vnfProvider='Company',
instantiationState='NOT_INSTANTIATED')]
result = self.controller.index(request)
self.assertEqual(200, result.status)
# no filter
request.GET = {}
result = self.controller.index(request)
self.assertEqual(200, result.status)
@mock.patch.object(inst_utils, 'get_inst')
def test_show(self, mock_inst):
request = requests.Request()
request.context = self.context
mock_inst.return_value = objects.VnfInstanceV2(
id='inst-1', vnfdId=SAMPLE_VNFD_ID, vnfProvider='Company',
instantiationState='NOT_INSTANTIATED')
result = self.controller.show(request, 'inst-1')
self.assertEqual(200, result.status)
@ddt.data(*get_test_data_policy_instantiate())
@ddt.unpack
@mock.patch.object(vim_utils, 'get_vim')
@mock.patch.object(conductor_rpc_v2.VnfLcmRpcApiV2, 'start_lcm_op')
def test_instantiate_enhanced_policy(self, mock_start,
mock_vim, vnf_instance_updates, rules, roles,
expected_status_code):
self._overwrite_policy(rules)
inst_id, _ = self._create_inst_and_lcmocc('NOT_INSTANTIATED',
fields.LcmOperationStateType.COMPLETED,
vnf_inst_updates=vnf_instance_updates)
body = {
"flavourId": "small",
"vimConnectionInfo": {
"vim1": {
"vimId": "vim_id_1",
"vimType": "ETSINFV.OPENSTACK_KEYSTONE.V_3"
}
}
}
mock_vim.return_value = objects.VimConnectionInfo.from_dict(
_vim_connection_info_example)
try:
result = self.controller.instantiate(
request=self._fake_request(roles), id=inst_id, body=body)
self.assertEqual(expected_status_code, result.status)
except PolicyNotAuthorized:
if expected_status_code != http_client.FORBIDDEN:
raise
@mock.patch.object(vim_utils, 'get_vim')
@mock.patch.object(conductor_rpc_v2.VnfLcmRpcApiV2, 'start_lcm_op')
def test_instantiate_add_area(self, mock_start, mock_vim):
inst_id, _ = self._create_inst_and_lcmocc(
'NOT_INSTANTIATED', fields.LcmOperationStateType.COMPLETED)
body = {
"flavourId": "small",
"vimConnectionInfo": {
"vim1": {
"vimId": "vim_id_1",
"vimType": "ETSINFV.OPENSTACK_KEYSTONE.V_3"
}
}
}
vim = copy.copy(_vim_connection_info_example)
vim.update({'extra': {'area': 'area_A@region_A'}})
mock_vim.return_value = objects.VimConnectionInfo.from_dict(vim)
result = self.controller.instantiate(
request=self.request, id=inst_id, body=body)
self.assertEqual(202, result.status)
@mock.patch.object(vim_utils, 'get_vim')
@mock.patch.object(conductor_rpc_v2.VnfLcmRpcApiV2, 'start_lcm_op')
def test_instantiate_with_area_in_vim_conn_info(
self, mock_start, mock_vim):
inst_id, _ = self._create_inst_and_lcmocc(
'NOT_INSTANTIATED', fields.LcmOperationStateType.COMPLETED)
body = {
"flavourId": "small",
"vimConnectionInfo": {
"vim1": {
"vimId": "vim_id_1",
"vimType": "ETSINFV.OPENSTACK_KEYSTONE.V_3",
"interfaceInfo": {"endpoint": "http://127.0.0.1/identity"},
"accessInfo": {
"username": "nfv_user",
"region": "RegionOne",
"password": "devstack",
"project": "nfv",
"projectDomain": "Default",
"userDomain": "Default"
},
"extra": {"area": "area_A@region_A"}
}
}
}
vim = copy.copy(_vim_connection_info_example)
mock_vim.return_value = objects.VimConnectionInfo.from_dict(vim)
result = self.controller.instantiate(
request=self.request, id=inst_id, body=body)
self.assertEqual(202, result.status)
@ddt.data(*get_test_data_policy_vnf_instantiated(
'terminate', http_client.ACCEPTED))
@ddt.unpack
@mock.patch.object(conductor_rpc_v2.VnfLcmRpcApiV2, 'start_lcm_op')
def test_terminate_enhanced_policy(self, mock_start, vnf_instance_updates,
rules, roles, expected_status_code):
self._overwrite_policy(rules)
inst_id, _ = self._create_inst_and_lcmocc(
'INSTANTIATED', fields.LcmOperationStateType.COMPLETED,
vnf_inst_updates=vnf_instance_updates)
body = {"terminationType": "FORCEFUL"}
try:
result = self.controller.terminate(
request=self._fake_request(roles), id=inst_id,
body=body)
self.assertEqual(202, result.status)
except PolicyNotAuthorized:
if expected_status_code != http_client.FORBIDDEN:
raise
@ddt.data(*get_test_data_policy_vnf_instantiated(
'heal', http_client.ACCEPTED))
@ddt.unpack
@mock.patch.object(conductor_rpc_v2.VnfLcmRpcApiV2, 'start_lcm_op')
def test_heal_enhanced_policy(self, mock_start, vnf_instance_updates,
rules, roles, expected_status_code):
self._overwrite_policy(rules)
inst_id, _ = self._create_inst_and_lcmocc('INSTANTIATED',
fields.LcmOperationStateType.COMPLETED,
vnf_inst_updates=vnf_instance_updates)
body = {"cause": "Healing VNF instance"}
try:
result = self.controller.heal(
request=self._fake_request(roles), id=inst_id,
body=body)
self.assertEqual(http_client.ACCEPTED, result.status)
except PolicyNotAuthorized:
if expected_status_code != http_client.FORBIDDEN:
raise
@ddt.data(*get_test_data_policy_vnf_instantiated(
'delete', http_client.NO_CONTENT))
@ddt.unpack
@mock.patch.object(nfvo_client.NfvoClient, 'send_inst_delete_notification')
def test_delete_enhanced_policy(self, mock_delete, vnf_instance_updates,
rules, roles, expected_status_code):
self._overwrite_policy(rules)
inst_id, _ = self._create_inst_and_lcmocc('NOT_INSTANTIATED',
fields.LcmOperationStateType.COMPLETED,
vnf_inst_updates=vnf_instance_updates)
try:
result = self.controller.delete(
self._fake_request(roles), id=inst_id)
self.assertEqual(http_client.NO_CONTENT, result.status)
except PolicyNotAuthorized:
if expected_status_code != http_client.FORBIDDEN:
raise
@ddt.data(*get_test_data_policy_vnf_instantiated(
'show', http_client.OK))
@ddt.unpack
@mock.patch.object(inst_utils, 'get_inst')
def test_show_enhanced_policy(self, mock_inst, vnf_instance_updates,
rules, roles, expected_status_code):
self._overwrite_policy(rules)
mock_inst.return_value = objects.VnfInstanceV2(
id='inst-1', vnfdId=SAMPLE_VNFD_ID,
instantiationState='NOT_INSTANTIATED', **vnf_instance_updates)
try:
result = self.controller.show(self._fake_request(roles), 'inst-1')
self.assertEqual(200, result.status)
except PolicyNotAuthorized:
if expected_status_code != http_client.FORBIDDEN:
raise
@ddt.data(*get_test_data_policy_vnf_instantiated(
'scale', http_client.ACCEPTED))
@ddt.unpack
@mock.patch.object(conductor_rpc_v2.VnfLcmRpcApiV2, 'start_lcm_op')
def test_scale_enhanced_policy(self, mock_start, vnf_instance_updates,
rules, roles, expected_status_code):
self._overwrite_policy(rules)
scale_status = [
objects.ScaleInfoV2(
aspectId="aspect_1",
scaleLevel=1
)
]
max_scale_levels = [
objects.ScaleInfoV2(
aspectId="aspect_1",
scaleLevel=3
)
]
inst_id = self._prepare_db_for_scale_param_check(
scale_status, max_scale_levels,
vnf_inst_updates=vnf_instance_updates)
body = {"aspectId": "aspect_1", "type": "SCALE_OUT",
"numberOfSteps": 1}
try:
result = self.controller.scale(
request=self._fake_request(roles),
id=inst_id,
body=body)
self.assertEqual(http_client.ACCEPTED, result.status)
except PolicyNotAuthorized:
if expected_status_code != http_client.FORBIDDEN:
raise
@ddt.data(*get_test_data_policy_vnf_instantiated(
'update', http_client.ACCEPTED))
@ddt.unpack
@mock.patch.object(conductor_rpc_v2.VnfLcmRpcApiV2, 'modify_vnfinfo')
def test_update_enhanced_policy(self, mock_modify, vnf_instance_updates,
rules, roles, expected_status_code):
self._overwrite_policy(rules)
inst_fields = {
# required fields
'id': uuidutils.generate_uuid(),
'vnfdId': uuidutils.generate_uuid(),
'vnfProvider': 'provider',
'vnfProductName': 'product name',
'vnfSoftwareVersion': 'software version',
'vnfdVersion': 'vnfd version',
'instantiationState': 'INSTANTIATED'
}
if vnf_instance_updates:
inst_fields.update(vnf_instance_updates)
inst = objects.VnfInstanceV2(**inst_fields)
inst.instantiatedVnfInfo = objects.VnfInstanceV2_InstantiatedVnfInfo(
flavourId='small',
vnfState='STARTED',
vnfcInfo=[
objects.VnfcInfoV2(
id="VDU1-vnfc_res_info_id_VDU1",
vduId="VDU1",
vnfcResourceInfoId="vnfc_res_info_id_VDU1",
vnfcState="STARTED"
)
]
)
inst.create(self.context)
body = {
"vnfcInfoModifications": [
{
"id": "VDU1-vnfc_res_info_id_VDU1",
"vnfcConfigurableProperties": {"key": "value"}
}
]
}
try:
result = self.controller.update(
request=self._fake_request(roles), id=inst.id, body=body)
self.assertEqual(202, result.status)
except PolicyNotAuthorized:
if expected_status_code != http_client.FORBIDDEN:
raise
@ddt.data(*get_test_data_policy_vnf_instantiated(
'change_ext_conn', http_client.ACCEPTED))
@ddt.unpack
@mock.patch.object(conductor_rpc_v2.VnfLcmRpcApiV2, 'start_lcm_op')
def test_change_ext_conn_enhanced_policy(self, mock_start,
vnf_instance_updates, rules, roles, expected_status_code):
self._overwrite_policy(rules)
inst_id, _ = self._create_inst_and_lcmocc('INSTANTIATED',
fields.LcmOperationStateType.COMPLETED,
vnf_inst_updates=vnf_instance_updates)
try:
result = self.controller.change_ext_conn(
request=self._fake_request(roles), id=inst_id,
body=_change_ext_conn_req_example)
self.assertEqual(202, result.status)
except PolicyNotAuthorized:
if expected_status_code != http_client.FORBIDDEN:
raise
@ddt.data(*get_test_data_policy_vnf_instantiated(
'change_vnfpkg', http_client.ACCEPTED))
@ddt.unpack
@mock.patch.object(nfvo_client.NfvoClient, 'get_vnf_package_info_vnfd')
@mock.patch.object(conductor_rpc_v2.VnfLcmRpcApiV2, 'start_lcm_op')
def test_change_vnfpkg_pkg_enhanced_policy(
self, mock_start, mocked_get_vnf_package_info_vnfd,
vnf_instance_updates, rules, roles, expected_status_code):
self._overwrite_policy(rules)
vnfd_id = uuidutils.generate_uuid()
inst_id = self._prepare_db_for_change_vnfpkg_param(
vnf_inst_updates=vnf_instance_updates)
body = {
"vnfdId": vnfd_id,
"additionalParams": {
"upgrade_type": "RollingUpdate",
"vdu_params": [{
"vdu_id": "VDU1",
"new_vnfc_param": {
"username": "test",
"password": "test",
"cp_name": "VDU1_CP1"
},
"old_vnfc_param": {
"username": "test",
"password": "test",
"cp_name": "VDU1_CP1"
}
}]
}
}
pkg_info = objects.VnfPkgInfoV2(
id=uuidutils.generate_uuid(),
vnfdId=vnfd_id,
vnfProvider="provider",
vnfProductName="product",
vnfSoftwareVersion="software version",
vnfdVersion="vnfd version",
operationalState="ENABLED"
)
mocked_get_vnf_package_info_vnfd.return_value = pkg_info
try:
result = self.controller.change_vnfpkg(
request=self._fake_request(roles), id=inst_id, body=body)
self.assertEqual(202, result.status)
except PolicyNotAuthorized:
if expected_status_code != http_client.FORBIDDEN:
raise
@ddt.data(*get_test_data_policy_index())
@ddt.unpack
@mock.patch.object(inst_utils, 'get_inst_all')
def test_index_enhanced_policy(self, mock_inst, vnf_instance_list,
rules, roles, expected_vnf_inst_ids):
self._overwrite_policy(rules)
mock_inst.return_value = vnf_instance_list
request = self._fake_request(roles)
request.GET = {}
result = self.controller.index(request)
self.assertEqual(200, result.status)
result = result.body
import logging
log = logging.getLogger(__name__)
log.debug(f'tag_body: {result}')
self.assertEqual(
expected_vnf_inst_ids, [inst.get('id') for inst in result])

View File

@ -15,9 +15,11 @@
from copy import deepcopy
import datetime
from http import client as http_client
import iso8601
import json
import os
from oslo_utils import uuidutils
import webob
from tacker.api.vnflcm.v1.router import VnflcmAPIRouter
@ -32,6 +34,7 @@ from tacker.objects.instantiate_vnf_req import ExtVirtualLinkData
from tacker.objects.instantiate_vnf_req import InstantiateVnfRequest
from tacker.objects import scale_vnf_request
from tacker.objects.vim_connection import VimConnectionInfo
from tacker.policies import vnf_lcm as vnf_lcm_policies
from tacker.tests import constants
from tacker.tests import uuidsentinel
from tacker import wsgi
@ -106,9 +109,9 @@ def fake_vnf_package_vnfd_model_dict(**updates):
return vnfd
def return_vnf_package_vnfd():
def return_vnf_package_vnfd(**updates):
model_obj = models.VnfPackageVnfd()
model_obj.update(fake_vnf_package_vnfd_model_dict())
model_obj.update(fake_vnf_package_vnfd_model_dict(**updates))
return model_obj
@ -1896,3 +1899,577 @@ def fake_subscription_response(**updates):
data = _subscription_links(data)
return data
def get_test_data_policy_create():
rules = {vnf_lcm_policies.VNFLCM % 'create': "vendor:%(vendor)s"}
test_data = [
# 'expected_status_code': http_client.OK
{
'vnfd_updates': {'vnf_provider': 'provider_A'},
'rules': rules,
'roles': ['VENDOR_provider_A'],
'expected_status_code': http_client.CREATED
},
{
'vnfd_updates': {'vnf_provider': ''},
'rules': rules,
'roles': ['VENDOR_'],
'expected_status_code': http_client.CREATED
},
{
'vnfd_updates': {'vnf_provider': 'provider_A'},
'rules': rules,
'roles': ['VENDOR_all'],
'expected_status_code': http_client.CREATED
},
{
'vnfd_updates': {'vnf_provider': 'provider_B'},
'rules': rules,
'roles': ['VENDOR_provider_B'],
'expected_status_code': http_client.CREATED
},
{
'vnfd_updates': {'vnf_provider': 'provider_B'},
'rules': rules,
'roles': ['VENDOR_all'],
'expected_status_code': http_client.CREATED
},
{
'vnfd_updates': {'vnf_provider': 'provider_B'},
'rules': {vnf_lcm_policies.VNFLCM % 'create': "@"},
'roles': [],
'expected_status_code': http_client.CREATED
},
# 'expected_status_code': http_client.FORBIDDEN
{
'vnfd_updates': {'vnf_provider': 'provider_A'},
'rules': rules,
'roles': ['VENDOR_B'],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnfd_updates': {'vnf_provider': 'provider_B'},
'rules': rules,
'roles': ['VENDOR_A'],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnfd_updates': {'vnf_provider': 'provider_A'},
'rules': rules,
'roles': [],
'expected_status_code': http_client.FORBIDDEN
},
]
return test_data
def get_test_data_policy_index():
vim_connection_info_area_a_region_a = VimConnectionInfo(
id='f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
vim_id='f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
vim_type='openstack',
access_info={"key1": 'value1', "key2": 'value2'},
extra={'area': 'area_A@region_A'}
)
vim_connection_info_area_b_region_a = VimConnectionInfo(
id='f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
vim_id='f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
vim_type='openstack',
access_info={"key1": 'value1', "key2": 'value2'},
extra={'area': 'area_B@region_A'}
)
vnf_instance_updates_area_a = {
'id': uuidutils.generate_uuid(),
'vnf_provider': 'provider_A',
'vim_connection_info': [vim_connection_info_area_a_region_a]
}
vnf_instance_updates_area_b = {
'id': uuidutils.generate_uuid(),
'vnf_provider': 'provider_A',
'vim_connection_info': [vim_connection_info_area_b_region_a]
}
vnf_instance_updates_provider_a = {
'id': uuidutils.generate_uuid(),
'vnf_provider': 'provider_A',
}
vnf_instance_updates_provider_b = {
'id': uuidutils.generate_uuid(),
'vnf_provider': 'provider_B',
}
vnf_instances_a = return_vnf_instance(**vnf_instance_updates_area_a)
vnf_instances_b = return_vnf_instance(**vnf_instance_updates_area_b)
vnf_instances_provider_a = return_vnf_instance(
**vnf_instance_updates_provider_a)
vnf_instances_provider_b = return_vnf_instance(
**vnf_instance_updates_provider_b)
rule_area_vendor_namespace = {
vnf_lcm_policies.VNFLCM % 'index':
"area:%(area)s and vendor:%(vendor)s and "
"namespace:%(namespace)s"
}
test_data = [
{
'vnf_instance_list': [vnf_instances_a, vnf_instances_b],
'rules': rule_area_vendor_namespace,
'roles': ['AREA_area_A@region_A', 'VENDOR_provider_A'],
'expected_vnf_inst_ids': [vnf_instances_a.id]
},
{
'vnf_instance_list': [vnf_instances_a, vnf_instances_b],
'rules': rule_area_vendor_namespace,
'roles': ['AREA_all@all', 'VENDOR_provider_A'],
'expected_vnf_inst_ids': [vnf_instances_a.id, vnf_instances_b.id]
},
{
'vnf_instance_list': [
vnf_instances_provider_a, vnf_instances_provider_b],
'rules': rule_area_vendor_namespace,
'roles': ['AREA_all@all', 'VENDOR_provider_A'],
'expected_vnf_inst_ids': [vnf_instances_provider_a.id]
}
]
return test_data
def get_test_data_policy_instantiate():
test_data = [
# 'expected_status_code': http_client.ACCEPTED
{
'vnf_instance_updates': {'vnf_provider': 'provider_A'},
'rules': {
"os_nfv_orchestration_api:vnf_instances:instantiate":
"vendor:%(vendor)s"
},
'roles': ['VENDOR_provider_A'],
'expected_status_code': http_client.ACCEPTED
},
{
'vnf_instance_updates': {'vnf_provider': 'provider_A'},
'rules': {
"os_nfv_orchestration_api:vnf_instances:instantiate":
"vendor:%(vendor)s"
},
'roles': ['VENDOR_all'],
'expected_status_code': http_client.ACCEPTED
},
# 'expected_status_code': http_client.FORBIDDEN
{
'vnf_instance_updates': {'vnf_provider': 'provider_A'},
'rules': {
"os_nfv_orchestration_api:vnf_instances:instantiate":
"vendor:%(vendor)s"
},
'roles': [],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': {'vnf_provider': 'provider_A'},
'rules': {
"os_nfv_orchestration_api:vnf_instances:instantiate":
"vendor:%(vendor)s"
},
'roles': ['VENDOR_provider_B'],
'expected_status_code': http_client.FORBIDDEN
},
]
return test_data
def get_test_data_policy_vnf_not_instantiated(action):
vim_connection_info = {
"id": 'f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
"vim_id": 'f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
"vim_type": 'openstack',
"access_info": {"key1": 'value1', "key2": 'value2'},
'extra': {'area': 'area_A@region_A'}
}
vim_connection_info_without_area = {
"id": 'f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
"vim_id": 'f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
"vim_type": 'openstack',
"access_info": {"key1": 'value1', "key2": 'value2'},
}
vim_connection_info_wrong_area = {
"id": 'f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
"vim_id": 'f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
"vim_type": 'openstack',
"access_info": {"key1": 'value1', "key2": 'value2'},
'extra': {'area': 'area_A'}
}
vnf_instance_updates = {
'vnf_provider': 'provider_A',
'vim_connection_info': [vim_connection_info]
}
vnf_instance_updates_without_area = {
'vnf_provider': 'provider_A',
'vim_connection_info': [vim_connection_info_without_area]
}
vnf_instance_updates_with_wrong_area = {
'vnf_provider': 'provider_A',
'vim_connection_info': [vim_connection_info_wrong_area]
}
rule_area_vendor_namespace = {
vnf_lcm_policies.VNFLCM % action:
"area:%(area)s and vendor:%(vendor)s and "
"namespace:%(namespace)s"
}
rule_vendor = {
vnf_lcm_policies.VNFLCM % action: "vendor:%(vendor)s"
}
test_data = [
# 'expected_status_code': http_client.OK
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': ['AREA_area_A@region_A', 'VENDOR_provider_A'],
'expected_status_code': http_client.OK
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': ['AREA_all@all', 'VENDOR_all', 'NAMESPACE_all'],
'expected_status_code': http_client.OK
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': ['AREA_all@region_A', 'VENDOR_all', 'NAMESPACE_all'],
'expected_status_code': http_client.OK
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_all@region_A', 'AREA_all@all', 'VENDOR_all',
'NAMESPACE_all'],
'expected_status_code': http_client.OK
},
{
'vnf_instance_updates': vnf_instance_updates_without_area,
'rules': rule_area_vendor_namespace,
'roles': ['AREA_area_A@region_A', 'VENDOR_provider_A'],
'expected_status_code': http_client.OK
},
{
'vnf_instance_updates': vnf_instance_updates_without_area,
'rules': rule_area_vendor_namespace,
'roles': ['AREA_all@all', 'VENDOR_all', 'NAMESPACE_all'],
'expected_status_code': http_client.OK
},
{
'vnf_instance_updates': vnf_instance_updates_without_area,
'rules': rule_area_vendor_namespace,
'roles': ['AREA_all@region_A', 'VENDOR_all', 'NAMESPACE_all'],
'expected_status_code': http_client.OK
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_vendor,
'roles': ['AREA_area_A@region_A', 'VENDOR_provider_A'],
'expected_status_code': http_client.OK
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_vendor,
'roles': ['AREA_all@all', 'VENDOR_all', 'NAMESPACE_all'],
'expected_status_code': http_client.OK
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_vendor,
'roles': ['AREA_all@region_A', 'VENDOR_all', 'NAMESPACE_all'],
'expected_status_code': http_client.OK
},
# 'expected_status_code': http_client.FORBIDDEN
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': ['AREA_area_A@region_A'],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': ['VENDOR_provider_A'],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': ['AREA_area_A@region_A', 'VENDOR_provider_B'],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': ['AREA_area_B@region_A', 'VENDOR_provider_A'],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': ['AREA_all@region_B', 'VENDOR_provider_A'],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': ['AREA_all', 'VENDOR_provider_A'],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': ['AREA_all@', 'VENDOR_provider_A'],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates_with_wrong_area,
'rules': rule_area_vendor_namespace,
'roles': ['AREA_all@', 'VENDOR_provider_A'],
'expected_status_code': http_client.FORBIDDEN
},
]
return test_data
def get_test_data_policy_vnf_instantiated(action, success_status_code):
vim_connection_info_area_a_region_a = VimConnectionInfo(
id='f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
vim_id='f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
vim_type='openstack',
access_info={"key1": 'value1', "key2": 'value2'},
extra={'area': 'area_A@region_A'}
)
vim_connection_info_without_area = VimConnectionInfo(
id='f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
vim_id='f8c35bd0-4d67-4436-9f11-14b8a84c92aa',
vim_type='openstack',
access_info={"key1": 'value1', "key2": 'value2'}
)
vnf_instance_updates = {
'vnf_provider': 'provider_A',
'vnf_metadata': {"namespace": "namespace_A"},
'vim_connection_info': [vim_connection_info_area_a_region_a]
}
vnf_instance_updates_without_namespace = {
'vnf_provider': 'provider_A',
'vim_connection_info': [vim_connection_info_area_a_region_a]
}
vnf_instance_updates_without_area = {
'vnf_provider': 'provider_A',
'vnf_metadata': {"namespace": "namespace_A"},
'vim_connection_info': [vim_connection_info_without_area]
}
vnf_instance_updates_without_area_namespace = {
'vnf_provider': 'provider_A',
'vim_connection_info': [vim_connection_info_without_area]
}
rule_area_vendor_namespace = {
vnf_lcm_policies.VNFLCM % action:
"area:%(area)s and vendor:%(vendor)s and "
"namespace:%(namespace)s"
}
rule_vendor = {
vnf_lcm_policies.VNFLCM % action: "vendor:%(vendor)s"
}
test_data = [
# 'expected_status_code': success_status_code
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_area_A@region_A',
'VENDOR_provider_A',
'NAMESPACE_namespace_A'
],
'expected_status_code': success_status_code
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_all@all', 'VENDOR_all', 'NAMESPACE_all'
],
'expected_status_code': success_status_code
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_all@region_A', 'VENDOR_all', 'NAMESPACE_all'
],
'expected_status_code': success_status_code
},
{
'vnf_instance_updates': vnf_instance_updates_without_namespace,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_area_A@region_A',
'VENDOR_provider_A',
'NAMESPACE_namespace_A'
],
'expected_status_code': success_status_code
},
{
'vnf_instance_updates': vnf_instance_updates_without_area,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_area_A@region_A',
'VENDOR_provider_A',
'NAMESPACE_namespace_A'
],
'expected_status_code': success_status_code
},
{
'vnf_instance_updates':
vnf_instance_updates_without_area_namespace,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_area_A@region_A',
'VENDOR_provider_A',
'NAMESPACE_namespace_A'
],
'expected_status_code': success_status_code
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_vendor,
'roles': [
'AREA_area_A@region_A',
'VENDOR_provider_A',
'NAMESPACE_namespace_A'
],
'expected_status_code': success_status_code
},
# 'expected_status_code': http_client.FORBIDDEN
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_area_B@region_A',
'VENDOR_provider_A',
'NAMESPACE_namespace_A'
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_area_A@region_A',
'VENDOR_provider_B',
'NAMESPACE_namespace_A'
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_area_A@region_A',
'VENDOR_provider_A',
'NAMESPACE_namespace_B'
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [
'VENDOR_provider_A',
'NAMESPACE_namespace_A'
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_area_A@region_A',
'NAMESPACE_namespace_A'
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_area_A@region_A',
'VENDOR_provider_A',
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_all@region_B',
'VENDOR_provider_A',
'NAMESPACE_namespace_A'
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_vendor,
'roles': [
'AREA_area_A@region_A',
'VENDOR_provider_B',
'NAMESPACE_namespace_A'
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates': vnf_instance_updates,
'rules': rule_vendor,
'roles': [
'AREA_area_A@region_A',
'NAMESPACE_namespace_A'
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates':
vnf_instance_updates_without_area_namespace,
'rules': rule_area_vendor_namespace,
'roles': [
'AREA_area_B@region_A',
'VENDOR_provider_B',
'NAMESPACE_namespace_A'
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates':
vnf_instance_updates_without_area_namespace,
'rules': rule_area_vendor_namespace,
'roles': [
'VENDOR_provider_B',
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnf_instance_updates':
vnf_instance_updates_without_area_namespace,
'rules': rule_vendor,
'roles': [
'VENDOR_provider_B',
],
'expected_status_code': http_client.FORBIDDEN
}
]
return test_data

View File

@ -26,6 +26,7 @@ import webob
from webob import exc
from oslo_config import cfg
from oslo_policy import policy as oslo_policy
from oslo_serialization import jsonutils
from tacker.api.views import vnf_subscriptions as vnf_subscription_view
@ -42,6 +43,7 @@ from tacker.manager import TackerManager
from tacker import objects
from tacker.objects import fields
from tacker.objects import vnf_lcm_subscriptions as subscription_obj
from tacker import policy
from tacker.tests import constants
from tacker.tests.unit import base
from tacker.tests.unit.db import utils
@ -261,11 +263,13 @@ class TestController(base.TestCase):
@mock.patch.object(TackerManager, 'get_service_plugins',
return_value={'VNFM':
test_nfvo_plugin.FakeVNFMPlugin()})
@mock.patch.object(objects.vnf_instance, '_vnf_instance_update')
@mock.patch.object(objects.vnf_instance, '_vnf_instance_create')
@mock.patch.object(objects.vnf_package_vnfd.VnfPackageVnfd, 'get_by_id')
def test_create_without_name_and_description(
self, mock_get_by_id,
mock_vnf_instance_create,
mock_vnf_instance_update,
mock_get_service_plugins,
mock_package_save,
mock_private_create_vnf,
@ -285,6 +289,8 @@ class TestController(base.TestCase):
mock_vnf_instance_create.return_value =\
fakes.return_vnf_instance_model(**updates)
mock_vnf_instance_update.return_value = \
fakes.return_vnf_instance_model(**updates)
body = {'vnfdId': uuidsentinel.vnfd_id,
"vnfInstanceName": "SampleVnf",
@ -396,17 +402,20 @@ class TestController(base.TestCase):
return_value={'VNFM': test_nfvo_plugin.FakeVNFMPlugin()})
@mock.patch.object(sync_resource.SyncVnfPackage, 'create_package')
@mock.patch.object(nfvo_client.VnfPackageRequest, "index")
@mock.patch.object(objects.vnf_instance, '_vnf_instance_update')
@mock.patch.object(objects.vnf_instance, '_vnf_instance_create')
@mock.patch.object(objects.vnf_package_vnfd.VnfPackageVnfd, 'get_by_id')
def test_create_vnf_package_not_found(
self, mock_get_by_id_package_vnfd,
mock_vnf_instance_create,
mock_vnf_instance_update,
mock_index, mock_create_pkg,
mock_get_service_plugins,
mock_private_create_vnf,
mock_vnf_package_get_by_id,
mock_update_package_usage_state,
mock_get_vim):
mock_get_vim.return_value = self.vim_info
mock_get_by_id_package_vnfd.side_effect =\
exceptions.VnfPackageVnfdNotFound
@ -421,6 +430,8 @@ class TestController(base.TestCase):
updates = {'vnfd_id': uuidsentinel.vnfd_id}
mock_vnf_instance_create.return_value =\
fakes.return_vnf_instance_model(**updates)
mock_vnf_instance_update.return_value =\
fakes.return_vnf_instance_model(**updates)
body = {'vnfdId': uuidsentinel.vnfd_id}
req = fake_request.HTTPRequest.blank('/vnf_instances')
@ -584,6 +595,9 @@ class TestController(base.TestCase):
mock_get_vnf, mock_insta_notfi_process,
mock_get_service_plugins):
vim = fakes.return_default_vim()
vim.update({'extra': {"area": "area_A@region_A"}})
mock_get_vim.return_value = vim
mock_vnf_instance_get_by_id.return_value =\
fakes.return_vnf_instance_model()
mock_vnf_package_vnfd_get_by_id.return_value = \
@ -595,7 +609,12 @@ class TestController(base.TestCase):
vnf_id=mock_vnf_instance_get_by_id.return_value.id,
status='INACTIVE')
body = {"flavourId": "simple"}
body = {"flavourId": "simple",
"vimConnectionInfo": [
{"id": uuidsentinel.vim_connection_id,
"vimId": uuidsentinel.vim_id,
"vimType": 'openstack'}
]}
body.update({"additionalParams": {"foo_number": 12}})
req = fake_request.HTTPRequest.blank(
'/vnf_instances/%s/instantiate' % uuidsentinel.vnf_instance_id)
@ -673,6 +692,9 @@ class TestController(base.TestCase):
mock_get_vnf, mock_insta_notif_process,
mock_get_service_plugins):
vim = fakes.return_default_vim()
vim.update({'extra': {"area": "area_A@region_A"}})
mock_get_vim.return_value = vim
mock_vnf_instance_get_by_id.return_value =\
fakes.return_vnf_instance_model()
mock_vnf_package_vnfd_get_by_id.return_value = \
@ -815,6 +837,9 @@ class TestController(base.TestCase):
mock_get_vnf, mock_insta_notif_process,
mock_get_service_plugins):
vim = fakes.return_default_vim()
vim.update({'extra': {"area": "area_A@region_A"}})
mock_get_vim.return_value = vim
mock_vnf_instance_get_by_id.return_value =\
fakes.return_vnf_instance_model()
mock_vnf_package_vnfd_get_by_id.return_value = \
@ -4676,3 +4701,780 @@ class TestController(base.TestCase):
"<class 'webob.exc.HTTPInternalServerError'>"}}
expected_msg = expected_vnf['tackerFault']['message']
self.assertEqual(expected_msg, resp.json['detail'])
@ddt.ddt
class TestControllerEnhancedPolicy(TestController):
def setUp(self):
super(TestControllerEnhancedPolicy, self).setUp()
cfg.CONF.set_override(
"enhanced_tacker_policy", True, group='oslo_policy')
@mock.patch.object(objects.VnfInstance, "get_by_id")
@mock.patch.object(TackerManager, 'get_service_plugins',
return_value={'VNFM':
test_nfvo_plugin.FakeVNFMPlugin()})
@mock.patch.object(objects.VNF, "vnf_index_list")
@mock.patch.object(objects.VnfInstanceList, "vnf_instance_list")
@mock.patch.object(objects.VnfPackageVnfd, 'get_vnf_package_vnfd')
@mock.patch.object(vnf_lcm_rpc.VNFLcmRPCAPI, "update")
def test_update_vnf_with_pkg_id(
self, mock_update,
mock_vnf_package_vnf_get_vnf_package_vnfd,
mock_vnf_instance_list, mock_vnf_index_list,
mock_get_service_plugins, mock_vnf_by_id):
mock_vnf_by_id.return_value = fakes.return_vnf_instance()
mock_vnf_index_list.return_value = fakes._get_vnf()
mock_vnf_instance_list.return_value = fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED)
mock_vnf_package_vnf_get_vnf_package_vnfd.return_value =\
fakes.return_vnf_package_vnfd()
body = {"vnfInstanceName": "new_instance_name",
"vnfInstanceDescription": "new_instance_discription",
"vnfPkgId": "2c69a161-0000-4b0f-bcf8-391f8fc76600",
"vnfConfigurableProperties": {"test": "test_value"},
"vnfcInfoModificationsDeleteIds": ["test1"]}
req = fake_request.HTTPRequest.blank(
'/vnf_instances/%s' % constants.UUID)
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = 'PATCH'
# Call Instantiate API
resp = req.get_response(self.app)
self.assertEqual(http_client.ACCEPTED, resp.status_code)
mock_update.assert_called_once()
@mock.patch.object(objects.VnfInstance, "get_by_id")
@mock.patch.object(TackerManager, 'get_service_plugins',
return_value={'VNFM':
test_nfvo_plugin.FakeVNFMPlugin()})
@mock.patch.object(objects.VNF, "vnf_index_list")
def test_update_vnf_status_err(
self,
mock_vnf_index_list,
mock_get_service_plugins,
mock_vnf_by_id):
mock_vnf_by_id.return_value = fakes.return_vnf_instance()
updates = {'status': 'ERROR'}
mock_vnf_index_list.return_value = fakes._get_vnf(**updates)
body = {"vnfInstanceName": "new_instance_name",
"vnfInstanceDescription": "new_instance_discription",
"vnfdId": "2c69a161-0000-4b0f-bcf8-391f8fc76600",
"vnfConfigurableProperties": {
"test": "test_value"
},
"vnfcInfoModificationsDeleteIds": ["test1"],
"metadata": {"testkey": "test_value"},
"vimConnectionInfo": {"id": "testid"}}
req = fake_request.HTTPRequest.blank(
'/vnf_instances/%s' % constants.UUID)
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = 'PATCH'
msg = _("VNF %(id)s status is %(state)s") % {
"id": constants.UUID, "state": "ERROR"}
res = self._make_problem_detail(msg %
{"state": "ERROR"}, 409, 'Conflict')
resp = req.get_response(self.app)
self.assertEqual(res.text, resp.text)
@mock.patch.object(objects.VnfInstance, "get_by_id")
@mock.patch.object(TackerManager, 'get_service_plugins',
return_value={'VNFM':
test_nfvo_plugin.FakeVNFMPlugin()})
@mock.patch.object(sync_resource.SyncVnfPackage, 'create_package')
@mock.patch.object(objects.vnf_package_vnfd.VnfPackageVnfd,
"get_vnf_package_vnfd")
@mock.patch.object(nfvo_client.VnfPackageRequest, "index")
@mock.patch.object(objects.VNF, "vnf_index_list")
@mock.patch.object(objects.VnfInstanceList, "vnf_instance_list")
@mock.patch.object(objects.VnfPackageVnfd, 'get_vnf_package_vnfd')
@mock.patch.object(vnf_lcm_rpc.VNFLcmRPCAPI, "update")
def test_update_vnf_none_vnfd(
self,
mock_update,
mock_vnf_package_vnf_get_vnf_package_vnfd,
mock_vnf_instance_list,
mock_vnf_index_list,
mock_index,
mock_get_vnf_package_vnfd,
mock_create_package,
mock_get_service_plugins,
mock_vnf_by_id):
mock_vnf_by_id.return_value = fakes.return_vnf_instance()
mock_vnf_index_list.return_value = fakes._get_vnf()
mock_vnf_instance_list.return_value = fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED)
mock_vnf_package_vnf_get_vnf_package_vnfd.return_value = ""
mock_get_vnf_package_vnfd.side_effect =\
exceptions.VnfPackageVnfdNotFound
mock_create_package.return_value = fakes.return_vnf_package_vnfd()
mock_response = mock.MagicMock()
mock_response.ok = True
mock_response.json = mock.MagicMock()
mock_response.json.return_value = ['aaa', 'bbb', 'ccc']
mock_index.return_value = mock_response
body = {
"vnfInstanceName": "new_instance_name",
"vnfInstanceDescription": "new_instance_discription",
"vnfPkgId": "2c69a161-0000-4b0f-bcf8-391f8fc76600",
"vnfConfigurableProperties": {"test": "test_value"},
"vnfcInfoModificationsDeleteIds": ["test1"]
}
req = fake_request.HTTPRequest.blank(
'/vnf_instances/%s' % constants.UUID)
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = 'PATCH'
resp = req.get_response(self.app)
self.assertEqual(http_client.ACCEPTED, resp.status_code)
mock_update.assert_called_once()
@mock.patch.object(objects.VnfInstance, "get_by_id")
@mock.patch.object(TackerManager, 'get_service_plugins',
return_value={'VNFM':
test_nfvo_plugin.FakeVNFMPlugin()})
@mock.patch.object(objects.VNF, "vnf_index_list")
def test_update_vnf_none_vnf_data(
self,
mock_vnf_index_list,
mock_get_service_plugins,
mock_vnf_by_id):
mock_vnf_by_id.return_value = fakes.return_vnf_instance()
mock_vnf_index_list.return_value = None
body = {"vnfInstanceName": "new_instance_name",
"vnfInstanceDescription": "new_instance_discription",
"vnfdId": "2c69a161-0000-4b0f-bcf8-391f8fc76600",
"vnfConfigurableProperties": {
"test": "test_value"
},
"vnfcInfoModificationsDeleteIds": ["test1"],
"metadata": {"testkey": "test_value"},
"vimConnectionInfo": {"id": "testid"}}
req = fake_request.HTTPRequest.blank(
'/vnf_instances/%s' % constants.UUID)
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = 'PATCH'
msg = _("Can not find requested vnf data: %s") % constants.UUID
res = self._make_problem_detail(msg, 404, title='Not Found')
resp = req.get_response(self.app)
self.assertEqual(res.text, resp.text)
@mock.patch.object(objects.VnfInstance, "get_by_id")
@mock.patch.object(TackerManager, 'get_service_plugins',
return_value={'VNFM':
test_nfvo_plugin.FakeVNFMPlugin()})
@mock.patch.object(objects.VNF, "vnf_index_list")
@mock.patch.object(objects.VnfInstanceList, "vnf_instance_list")
def test_update_vnf_none_instance_data(
self,
mock_vnf_instance_list,
mock_vnf_index_list,
mock_get_service_plugins,
mock_vnf_by_id):
mock_vnf_by_id.return_value = fakes.return_vnf_instance()
mock_vnf_index_list.return_value = fakes._get_vnf()
mock_vnf_instance_list.return_value = ""
body = {"vnfInstanceName": "new_instance_name",
"vnfInstanceDescription": "new_instance_discription",
"vnfdId": "2c69a161-0000-4b0f-bcf8-391f8fc76600",
"vnfConfigurableProperties": {
"test": "test_value"
},
"vnfcInfoModificationsDeleteIds": ["test1"],
"metadata": {"testkey": "test_value"},
"vimConnectionInfo": {"id": "testid"}}
req = fake_request.HTTPRequest.blank(
'/vnf_instances/%s' % constants.UUID)
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = 'PATCH'
vnf_data = fakes._get_vnf()
msg = ("Can not find requested vnf instance data: %s") % vnf_data.get(
'vnfd_id')
res = self._make_problem_detail(msg, 404, title='Not Found')
resp = req.get_response(self.app)
self.assertEqual(res.text, resp.text)
@mock.patch.object(objects.VnfInstance, "get_by_id")
@mock.patch.object(TackerManager, 'get_service_plugins',
return_value={'VNFM':
test_nfvo_plugin.FakeVNFMPlugin()})
@mock.patch.object(objects.VNF, "vnf_index_list")
@mock.patch.object(objects.VnfInstanceList, "vnf_instance_list")
@mock.patch.object(objects.VnfPackageVnfd, 'get_vnf_package_vnfd')
@mock.patch.object(vnf_lcm_rpc.VNFLcmRPCAPI, "update")
def test_update_vnf(
self,
mock_update,
mock_vnf_package_vnf_get_vnf_package_vnfd,
mock_vnf_instance_list,
mock_vnf_index_list,
mock_get_service_plugins,
mock_vnf_by_id):
mock_vnf_by_id.return_value = fakes.return_vnf_instance()
mock_vnf_index_list.return_value = fakes._get_vnf()
mock_vnf_instance_list.return_value = fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED)
mock_vnf_package_vnf_get_vnf_package_vnfd.return_value =\
fakes.return_vnf_package_vnfd()
body = {"vnfInstanceName": "new_instance_name",
"vnfInstanceDescription": "new_instance_discription",
"vnfdId": "2c69a161-0000-4b0f-bcf8-391f8fc76600",
"vnfConfigurableProperties": {
"test": "test_value"
},
"vnfcInfoModificationsDeleteIds": ["test1"],
"metadata": {"testkey": "test_value"},
"vimConnectionInfo": {"id": "testid"}}
req = fake_request.HTTPRequest.blank(
'/vnf_instances/%s' % constants.UUID)
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = 'PATCH'
# Call Instantiate API
resp = req.get_response(self.app)
self.assertEqual(http_client.ACCEPTED, resp.status_code)
mock_update.assert_called_once()
@ddt.data("vnfdId", "vnfPkgId")
@mock.patch.object(objects.VnfInstance, "get_by_id")
@mock.patch.object(TackerManager, 'get_service_plugins',
return_value={'VNFM':
test_nfvo_plugin.FakeVNFMPlugin()})
@mock.patch.object(sync_resource.SyncVnfPackage, 'create_package')
@mock.patch.object(objects.vnf_package_vnfd.VnfPackageVnfd,
"get_vnf_package_vnfd")
@mock.patch.object(nfvo_client.VnfPackageRequest, "index")
@mock.patch.object(objects.VNF, "vnf_index_list")
@mock.patch.object(objects.VnfInstanceList, "vnf_instance_list")
@mock.patch.object(objects.VnfPackageVnfd, 'get_vnf_package_vnfd')
@mock.patch.object(vnf_lcm_rpc.VNFLcmRPCAPI, "update")
def test_update_none_vnf_package_info(
self, input_id,
mock_update,
mock_vnf_package_vnf_get_vnf_package_vnfd,
mock_vnf_instance_list,
mock_vnf_index_list,
mock_index,
mock_get_vnf_package_vnfd,
mock_create_package,
mock_get_service_plugins,
mock_vnf_by_id):
mock_vnf_by_id.return_value = fakes.return_vnf_instance()
mock_vnf_index_list.return_value = fakes._get_vnf()
mock_vnf_instance_list.return_value = fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED)
mock_vnf_package_vnf_get_vnf_package_vnfd.return_value = ""
mock_get_vnf_package_vnfd.side_effect = \
exceptions.VnfPackageVnfdNotFound
mock_create_package.return_value = fakes.return_vnf_package_vnfd()
mock_response = mock.MagicMock()
mock_response.ok = False
mock_response.json = mock.MagicMock()
mock_response.json.return_value = ['aaa', 'bbb', 'ccc']
mock_index.return_value = mock_response
body = {"vnfInstanceName": "new_instance_name",
"vnfInstanceDescription": "new_instance_discription",
input_id: "2c69a161-0000-4b0f-bcf8-391f8fc76600",
"vnfConfigurableProperties": {"test": "test_value"},
"vnfcInfoModificationsDeleteIds": ["test1"]}
req = fake_request.HTTPRequest.blank(
'/vnf_instances/%s' % constants.UUID)
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = 'PATCH'
resp = req.get_response(self.app)
self.assertEqual(http_client.BAD_REQUEST, resp.status_code)
@ddt.data(*fakes.get_test_data_policy_vnf_not_instantiated('show'))
@ddt.unpack
@mock.patch.object(objects.vnf_instance, "_vnf_instance_get_by_id")
def test_show_enhanced_policy_vnf_not_instantiated(
self, mock_vnf_by_id, vnf_instance_updates,
rules, roles, expected_status_code):
mock_vnf_by_id.return_value = fakes.return_vnf_instance_model(
**vnf_instance_updates)
req = fake_request.HTTPRequest.blank(
'/vnf_instances/%s' % uuidsentinel.instance_id)
req.method = 'GET'
policy.set_rules(oslo_policy.Rules.from_dict(rules), overwrite=True)
ctx = context.Context(
'fake', 'fake', roles=roles)
resp = req.get_response(fakes.wsgi_app_v1(fake_auth_context=ctx))
self.assertEqual(expected_status_code, resp.status_code)
@ddt.data(*fakes.get_test_data_policy_vnf_instantiated(
'show', http_client.OK))
@ddt.unpack
@mock.patch.object(objects.VnfInstance, "get_by_id")
def test_show_enhanced_policy_vnf_instantiated(
self, mock_vnf_by_id, vnf_instance_updates,
rules, roles, expected_status_code):
mock_vnf_by_id.return_value = fakes.return_vnf_instance(
instantiated_state=fields.VnfInstanceState.INSTANTIATED,
**vnf_instance_updates)
req = fake_request.HTTPRequest.blank(
'/vnf_instances/%s' % uuidsentinel.instance_id)
req.method = 'GET'
policy.set_rules(oslo_policy.Rules.from_dict(rules), overwrite=True)
ctx = context.Context(
'fake', 'fake', roles=roles)
resp = req.get_response(fakes.wsgi_app_v1(fake_auth_context=ctx))
self.assertEqual(expected_status_code, resp.status_code)
@ddt.data(*fakes.get_test_data_policy_create())
@ddt.unpack
@mock.patch.object(objects.VnfPackage, 'get_by_id')
@mock.patch('tacker.api.vnflcm.v1.controller.'
'VnfLcmController._create_vnf')
@mock.patch.object(objects.vnf_instance, '_vnf_instance_update')
@mock.patch.object(objects.vnf_instance, '_vnf_instance_create')
@mock.patch.object(vim_client.VimClient, "get_vim")
@mock.patch.object(objects.VnfPackageVnfd, 'get_by_id')
def test_create_enhanced_policy(
self,
mock_vnf_package_vnfd_get_by_id,
mock_get_vim,
mock_vnf_instance_create,
mock_vnf_instance_update,
mock_private_create_vnf,
mock_vnf_package_get_by_id,
vnfd_updates,
rules,
roles,
expected_status_code):
mock_vnf_package_vnfd_get_by_id.return_value = \
fakes.return_vnf_package_vnfd(
**vnfd_updates)
updates = {'vnfd_id': uuidsentinel.vnfd_id}
mock_vnf_instance_create.return_value = \
fakes.return_vnf_instance_model(**updates)
mock_vnf_instance_update.return_value = \
fakes.return_vnf_instance_model(**updates)
mock_get_vim.return_value = self.vim_info
body = {'vnfdId': uuidsentinel.vnfd_id,
'metadata': {"key": "value"}}
req = fake_request.HTTPRequest.blank('/vnf_instances')
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = 'POST'
policy.set_rules(oslo_policy.Rules.from_dict(rules), overwrite=True)
ctx = context.Context(
'fake', 'fake', roles=roles)
resp = req.get_response(fakes.wsgi_app_v1(fake_auth_context=ctx))
self.assertEqual(expected_status_code, resp.status_code)
@ddt.data(*fakes.get_test_data_policy_vnf_instantiated(
'terminate', http_client.ACCEPTED))
@ddt.unpack
@mock.patch('tacker.api.vnflcm.v1.controller.'
'VnfLcmController._notification_process')
@mock.patch.object(objects.VnfInstance, "save")
@mock.patch('tacker.api.vnflcm.v1.controller.'
'VnfLcmController._get_vnf')
@mock.patch.object(objects.VnfInstance, "get_by_id")
def test_terminate_enhanced_policy(
self, mock_vnf_by_id, mock_get_vnf, mock_save,
mock_notification_process,
vnf_instance_updates,
rules, roles, expected_status_code):
vnf_instance_obj = fakes.return_vnf_instance(
instantiated_state=fields.VnfInstanceState.INSTANTIATED,
**vnf_instance_updates)
mock_vnf_by_id.return_value = vnf_instance_obj
mock_get_vnf.return_value = \
self._get_dummy_vnf(vnf_id=vnf_instance_obj.id, status='ACTIVE')
req = fake_request.HTTPRequest.blank(
'/vnf_instances/%s/terminate' % uuidsentinel.instance_id)
body = {'terminationType': 'FORCEFUL'}
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = 'POST'
policy.set_rules(oslo_policy.Rules.from_dict(rules), overwrite=True)
ctx = context.Context(
'fake', 'fake', roles=roles)
resp = req.get_response(fakes.wsgi_app_v1(fake_auth_context=ctx))
self.assertEqual(expected_status_code, resp.status_code)
@ddt.data(*fakes.get_test_data_policy_vnf_instantiated(
'heal', http_client.ACCEPTED))
@ddt.unpack
@mock.patch('tacker.api.vnflcm.v1.controller.'
'VnfLcmController._notification_process')
@mock.patch.object(objects.VnfInstance, "save")
@mock.patch('tacker.api.vnflcm.v1.controller.'
'VnfLcmController._get_vnf')
@mock.patch.object(objects.VnfInstance, "get_by_id")
def test_heal_enhanced_policy(
self, mock_vnf_by_id, mock_get_vnf, mock_save,
mock_notification_process,
vnf_instance_updates,
rules, roles, expected_status_code):
vnf_instance_obj = fakes.return_vnf_instance(
instantiated_state=fields.VnfInstanceState.INSTANTIATED,
**vnf_instance_updates)
mock_vnf_by_id.return_value = vnf_instance_obj
mock_get_vnf.return_value = \
self._get_dummy_vnf(vnf_id=vnf_instance_obj.id, status='ACTIVE')
req = fake_request.HTTPRequest.blank(
'/vnf_instances/%s/heal' % uuidsentinel.instance_id)
body = {}
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = 'POST'
policy.set_rules(oslo_policy.Rules.from_dict(rules), overwrite=True)
ctx = context.Context(
'fake', 'fake', roles=roles)
resp = req.get_response(fakes.wsgi_app_v1(fake_auth_context=ctx))
self.assertEqual(expected_status_code, resp.status_code)
@ddt.data(*fakes.get_test_data_policy_vnf_instantiated(
'delete', http_client.NO_CONTENT))
@ddt.unpack
@mock.patch('tacker.api.vnflcm.v1.controller.'
'VnfLcmController._delete')
@mock.patch('tacker.api.vnflcm.v1.controller.'
'VnfLcmController._notification_process')
@mock.patch('tacker.api.vnflcm.v1.controller.'
'VnfLcmController._get_vnf')
@mock.patch.object(objects.VnfInstance, "get_by_id")
def test_delete_enhanced_policy(
self,
mock_vnf_by_id,
mock_get_vnf,
mock_notification_process,
mock_private_delete,
vnf_instance_updates,
rules, roles, expected_status_code):
vnf_instance_obj = fakes.return_vnf_instance_delete(
**vnf_instance_updates)
mock_vnf_by_id.return_value = vnf_instance_obj
mock_get_vnf.return_value = \
self._get_dummy_vnf(vnf_id=vnf_instance_obj.id, status='ACTIVE')
req = fake_request.HTTPRequest.blank(
'/vnf_instances/%s' % uuidsentinel.vnf_instance_id)
req.headers['Content-Type'] = 'application/json'
req.method = 'DELETE'
policy.set_rules(oslo_policy.Rules.from_dict(rules), overwrite=True)
ctx = context.Context(
'fake', 'fake', roles=roles)
resp = req.get_response(fakes.wsgi_app_v1(fake_auth_context=ctx))
self.assertEqual(expected_status_code, resp.status_code)
@ddt.data(*fakes.get_test_data_policy_vnf_instantiated(
'scale', http_client.ACCEPTED))
@ddt.unpack
@mock.patch.object(objects.VnfLcmOpOcc, "create")
@mock.patch.object(TackerManager, 'get_service_plugins',
return_value={'VNFM': FakeVNFMPlugin()})
@mock.patch.object(tacker.db.vnfm.vnfm_db.VNFMPluginDb, "get_vnf")
@mock.patch('tacker.api.vnflcm.v1.controller.'
'VnfLcmController._notification_process')
@mock.patch.object(objects.VnfInstance, "save")
@mock.patch.object(objects.VnfInstance, "get_by_id")
def test_scale_enhanced_policy(
self, mock_vnf_by_id,
mock_save,
mock_notification_process,
mock_get_vnf,
mock_get_service_plugins,
mock_opocc_create,
vnf_instance_updates,
rules, roles, expected_status_code):
vnf_instance_obj = fakes.return_vnf_instance(
instantiated_state=fields.VnfInstanceState.INSTANTIATED,
scale_status="scale_status",
**vnf_instance_updates)
mock_vnf_by_id.return_value = vnf_instance_obj
mock_get_vnf.return_value = fakes._get_vnf()
req = fake_request.HTTPRequest.blank(
'/vnf_instances/%s/scale' % uuidsentinel.instance_id)
body = {"type": "SCALE_OUT",
"aspectId": "SP1",
"numberOfSteps": 1,
"additionalParams": {
"test": "test_value"}}
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = 'POST'
policy.set_rules(oslo_policy.Rules.from_dict(rules), overwrite=True)
ctx = context.Context(
'fake', 'fake', roles=roles)
resp = req.get_response(fakes.wsgi_app_v1(fake_auth_context=ctx))
self.assertEqual(expected_status_code, resp.status_code)
@ddt.data(*fakes.get_test_data_policy_vnf_instantiated(
'update_vnf', http_client.ACCEPTED))
@ddt.unpack
@mock.patch.object(TackerManager, 'get_service_plugins',
return_value={'VNFM':
test_nfvo_plugin.FakeVNFMPlugin()})
@mock.patch.object(objects.VNF, "vnf_index_list")
@mock.patch.object(objects.VnfInstanceList, "vnf_instance_list")
@mock.patch.object(objects.VnfPackageVnfd, 'get_vnf_package_vnfd')
@mock.patch.object(vnf_lcm_rpc.VNFLcmRPCAPI, "update")
@mock.patch.object(objects.VnfInstance, "get_by_id")
def test_update_vnf_enhanced_policy(
self,
mock_vnf_by_id,
mock_update,
mock_vnf_package_vnf_get_vnf_package_vnfd,
mock_vnf_instance_list,
mock_vnf_index_list,
mock_get_service_plugins,
vnf_instance_updates,
rules, roles, expected_status_code):
vnf_instance_obj = fakes.return_vnf_instance(
instantiated_state=fields.VnfInstanceState.INSTANTIATED,
scale_status="scale_status",
**vnf_instance_updates)
mock_vnf_by_id.return_value = vnf_instance_obj
mock_vnf_index_list.return_value = fakes._get_vnf()
mock_vnf_instance_list.return_value = fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED, **vnf_instance_updates)
mock_vnf_package_vnf_get_vnf_package_vnfd.return_value =\
fakes.return_vnf_package_vnfd()
body = {"vnfInstanceName": "new_instance_name",
"vnfInstanceDescription": "new_instance_discription",
"vnfdId": "2c69a161-0000-4b0f-bcf8-391f8fc76600",
"vnfConfigurableProperties": {
"test": "test_value"
},
"vnfcInfoModificationsDeleteIds": ["test1"],
"metadata": {"testkey": "test_value"},
"vimConnectionInfo": {"id": "testid"}}
req = fake_request.HTTPRequest.blank(
'/vnf_instances/%s' % constants.UUID)
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = 'PATCH'
policy.set_rules(oslo_policy.Rules.from_dict(rules), overwrite=True)
ctx = context.Context(
'fake', 'fake', roles=roles)
# Call Instantiate API
resp = req.get_response(fakes.wsgi_app_v1(fake_auth_context=ctx))
self.assertEqual(expected_status_code, resp.status_code)
if expected_status_code != http_client.FORBIDDEN:
mock_update.assert_called_once()
@ddt.data(*fakes.get_test_data_policy_vnf_instantiated(
'change_ext_conn', http_client.ACCEPTED))
@ddt.unpack
@mock.patch.object(TackerManager, 'get_service_plugins',
return_value={'VNFM':
test_nfvo_plugin.FakeVNFMPlugin()})
@mock.patch('tacker.api.vnflcm.v1.controller.'
'VnfLcmController._notification_process')
@mock.patch('tacker.api.vnflcm.v1.controller.'
'VnfLcmController._get_vnf')
@mock.patch.object(objects.VnfInstance, "get_by_id")
@mock.patch.object(objects.VnfInstance, "save")
@mock.patch.object(vnf_lcm_rpc.VNFLcmRPCAPI, "change_ext_conn")
def test_change_ext_conn_enhanced_policy(
self, mock_rpc, mock_save,
mock_vnf_by_id, mock_get_vnf,
mock_notification_process,
mock_get_service_plugins,
vnf_instance_updates,
rules, roles, expected_status_code):
vnf_instance_obj = fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED,
**vnf_instance_updates
)
mock_vnf_by_id.return_value = vnf_instance_obj
mock_get_vnf.return_value = \
self._get_dummy_vnf(vnf_id=vnf_instance_obj.id, status='ACTIVE')
body = fakes.get_change_ext_conn_request_body()
req = fake_request.HTTPRequest.blank(
'/vnf_instances/%s/change_ext_conn' % uuidsentinel.vnf_instance_id)
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = 'POST'
policy.set_rules(oslo_policy.Rules.from_dict(rules), overwrite=True)
ctx = context.Context(
'fake', 'fake', roles=roles)
resp = req.get_response(fakes.wsgi_app_v1(fake_auth_context=ctx))
self.assertEqual(expected_status_code, resp.status_code)
if expected_status_code != http_client.FORBIDDEN:
mock_rpc.assert_called_once()
@ddt.data(*fakes.get_test_data_policy_instantiate())
@ddt.unpack
@mock.patch.object(TackerManager, 'get_service_plugins',
return_value={'VNFM':
test_nfvo_plugin.FakeVNFMPlugin()})
@mock.patch('tacker.api.vnflcm.v1.controller.VnfLcmController.'
'_notification_process')
@mock.patch('tacker.api.vnflcm.v1.controller.'
'VnfLcmController._get_vnf')
@mock.patch.object(vim_client.VimClient, "get_vim")
@mock.patch.object(objects.vnf_instance, "_vnf_instance_get_by_id")
@mock.patch.object(objects.VnfInstance, "save")
@mock.patch.object(objects.VnfPackageVnfd, 'get_by_id')
@mock.patch.object(objects.VnfPackage, "get_by_id")
@mock.patch.object(vnf_lcm_rpc.VNFLcmRPCAPI, "instantiate")
def test_instantiate_with_vim_connection_enhanced_policy(
self, mock_instantiate, mock_vnf_package_get_by_id,
mock_vnf_package_vnfd_get_by_id, mock_save,
mock_vnf_instance_get_by_id, mock_get_vim,
mock_get_vnf, mock_insta_notif_process,
mock_get_service_plugins, vnf_instance_updates,
rules, roles, expected_status_code):
vim = fakes.return_default_vim()
vim.update({'extra': {"area": "area_A@region_A"}})
mock_get_vim.return_value = vim
mock_vnf_instance_get_by_id.return_value =\
fakes.return_vnf_instance_model(**vnf_instance_updates)
mock_vnf_package_vnfd_get_by_id.return_value = \
fakes.return_vnf_package_vnfd()
mock_vnf_package_get_by_id.return_value = \
fakes.return_vnf_package_with_deployment_flavour()
mock_get_vnf.return_value = \
self._get_dummy_vnf(
vnf_id=mock_vnf_instance_get_by_id.return_value.id,
status='INACTIVE')
body = {"flavourId": "simple",
"vimConnectionInfo": [
{"id": uuidsentinel.vim_connection_id,
"vimId": uuidsentinel.vim_id,
"vimType": 'openstack'}
]}
req = fake_request.HTTPRequest.blank(
'/vnf_instances/%s/instantiate' % uuidsentinel.vnf_instance_id)
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = 'POST'
policy.set_rules(oslo_policy.Rules.from_dict(rules), overwrite=True)
ctx = context.Context(
'fake', 'fake', roles=roles)
# Call Instantiate API
resp = req.get_response(fakes.wsgi_app_v1(fake_auth_context=ctx))
self.assertEqual(expected_status_code, resp.status_code)
if expected_status_code != http_client.FORBIDDEN:
self.assertTrue('Location' in resp.headers.keys())
expected_location = (self.expected_location_prefix +
str(mock_insta_notif_process.return_value))
self.assertEqual(expected_location, resp.headers['location'])
mock_instantiate.assert_called_once()
mock_get_vnf.assert_called_once()
mock_insta_notif_process.assert_called_once()
@ddt.data(*fakes.get_test_data_policy_index())
@ddt.unpack
@mock.patch.object(objects.VnfInstanceList, "get_by_filters")
def test_index_enhanced_policy(self, mock_vnf_list, vnf_instance_list,
rules, roles, expected_vnf_inst_ids):
mock_vnf_list.return_value = vnf_instance_list
req = fake_request.HTTPRequest.blank('/vnf_instances')
req.headers['Content-Type'] = 'application/json'
req.method = 'GET'
policy.set_rules(oslo_policy.Rules.from_dict(rules), overwrite=True)
ctx = context.Context(
'fake', 'fake', roles=roles)
resp = req.get_response(fakes.wsgi_app_v1(fake_auth_context=ctx))
self.assertEqual(http_client.OK, resp.status_code)
self.assertEqual(
expected_vnf_inst_ids, [inst.get('id') for inst in resp.json])
@mock.patch.object(TackerManager, 'get_service_plugins',
return_value={'VNFM':
test_nfvo_plugin.FakeVNFMPlugin()})
@mock.patch('tacker.api.vnflcm.v1.controller.VnfLcmController.'
'_notification_process')
@mock.patch('tacker.api.vnflcm.v1.controller.'
'VnfLcmController._get_vnf')
@mock.patch.object(vim_client.VimClient, "get_vim")
@mock.patch.object(objects.vnf_instance, "_vnf_instance_get_by_id")
@mock.patch.object(objects.VnfInstance, "save")
@mock.patch.object(objects.VnfPackageVnfd, 'get_by_id')
@mock.patch.object(objects.VnfPackage, "get_by_id")
@mock.patch.object(vnf_lcm_rpc.VNFLcmRPCAPI, "instantiate")
def test_instantiate_add_area(
self, mock_instantiate, mock_vnf_package_get_by_id,
mock_vnf_package_vnfd_get_by_id, mock_save,
mock_vnf_instance_get_by_id, mock_get_vim,
mock_get_vnf, mock_insta_notif_process,
mock_get_service_plugins):
vim = fakes.return_default_vim()
vim.update({'extra': {"area": "area_A@region_A"}})
mock_get_vim.return_value = vim
mock_vnf_instance_get_by_id.return_value =\
fakes.return_vnf_instance_model()
mock_vnf_package_vnfd_get_by_id.return_value = \
fakes.return_vnf_package_vnfd()
mock_vnf_package_get_by_id.return_value = \
fakes.return_vnf_package_with_deployment_flavour()
mock_get_vnf.return_value = \
self._get_dummy_vnf(
vnf_id=mock_vnf_instance_get_by_id.return_value.id,
status='INACTIVE')
body = {"flavourId": "simple",
"vimConnectionInfo": [
{"id": uuidsentinel.vim_connection_id,
"vimId": uuidsentinel.vim_id,
"vimType": 'openstack'}
]}
req = fake_request.HTTPRequest.blank(
'/vnf_instances/%s/instantiate' % uuidsentinel.vnf_instance_id)
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = 'POST'
# Call Instantiate API
resp = req.get_response(self.app)
self.assertTrue('Location' in resp.headers.keys())
expected_location = (self.expected_location_prefix +
str(mock_insta_notif_process.return_value))
self.assertEqual(expected_location, resp.headers['location'])
self.assertEqual(http_client.ACCEPTED, resp.status_code)
mock_instantiate.assert_called_once()
mock_get_vnf.assert_called_once()
mock_insta_notif_process.assert_called_once()

View File

@ -16,8 +16,10 @@
from copy import deepcopy
import datetime
from http import client as http_client
import iso8601
import os
from oslo_utils import uuidutils
import shutil
import uuid
import webob
@ -30,6 +32,7 @@ from tacker.objects import vnf_deployment_flavour as vnf_deployment_flavour_obj
from tacker.objects import vnf_package as vnf_package_obj
from tacker.objects import vnf_package_vnfd as vnf_package_vnfd_obj
from tacker.objects import vnf_software_image as vnf_software_image_obj
from tacker.policies import vnf_package as vnf_package_policies
from tacker.tests import constants
from tacker.tests import utils
from tacker.tests import uuidsentinel
@ -296,3 +299,95 @@ def return_vnfd_data(csar_without_tosca_meta=False):
shutil.rmtree(csar_temp_dir)
return file_path_and_data
def get_test_data_pkg_index():
rules = {
vnf_package_policies.VNFPKGM % 'index':
"vendor:%(vendor)s"
}
id_a = uuidutils.generate_uuid()
pkg_provider_a = return_vnfpkg_obj(
vnf_package_updates={'id': uuidutils.generate_uuid()},
vnfd_updates={
'vnf_provider': 'provider_A',
'id': id_a,
})
id_b = uuidutils.generate_uuid()
pkg_provider_b = return_vnfpkg_obj(
vnf_package_updates={'id': uuidutils.generate_uuid()},
vnfd_updates={
'vnf_provider': 'provider_B',
'id': id_b,
})
test_data = [
{
'pkg_list': [pkg_provider_a],
'rules': rules,
'roles': ['VENDOR_provider_A'],
'expected_pkg_ids': [pkg_provider_a.id]
},
{
'pkg_list': [pkg_provider_b],
'rules': rules,
'roles': ['VENDOR_provider_B'],
'expected_pkg_ids': [pkg_provider_b.id]
},
{
'pkg_list': [pkg_provider_a, pkg_provider_b],
'rules': rules,
'roles': ['VENDOR_all'],
'expected_pkg_ids': [pkg_provider_a.id, pkg_provider_b.id]
},
]
return test_data
def get_test_data_pkg_uploaded(action, success_status_code):
rules = {
vnf_package_policies.VNFPKGM % action:
"vendor:%(vendor)s"
}
test_data = [
# 'expected_status_code': success_status_code
{
'vnfd_updates': {
'vnf_provider': 'provider_A',
},
'rules': rules,
'roles': [
'VENDOR_provider_A',
],
'expected_status_code': success_status_code
},
{
'vnfd_updates': {
'vnf_provider': 'provider_A',
},
'rules': rules,
'roles': [
'VENDOR_all',
],
'expected_status_code': success_status_code
},
# 'expected_status_code': http_client.FORBIDDEN
{
'vnfd_updates': {
'vnf_provider': 'provider_A',
},
'rules': rules,
'roles': [
'VENDOR_provider_B',
],
'expected_status_code': http_client.FORBIDDEN
},
{
'vnfd_updates': {
'vnf_provider': 'provider_A',
},
'rules': rules,
'roles': [],
'expected_status_code': http_client.FORBIDDEN
},
]
return test_data

View File

@ -24,17 +24,20 @@ import urllib
from webob import exc
from oslo_config import cfg
from oslo_policy import policy as oslo_policy
from oslo_serialization import jsonutils
from tacker.api.vnfpkgm.v1 import controller
from tacker.common import exceptions as tacker_exc
from tacker.conductor.conductorrpc.vnf_pkgm_rpc import VNFPackageRPCAPI
from tacker import context
from tacker.glance_store import store as glance_store
from tacker import objects
from tacker.objects import fields
from tacker.objects import vnf_package
from tacker.objects.vnf_package import VnfPackagesList
from tacker.objects.vnf_software_image import VnfSoftwareImage
from tacker import policy
from tacker.tests import constants
from tacker.tests.unit import base
from tacker.tests.unit import fake_request
@ -1327,3 +1330,192 @@ class TestController(base.TestCase):
self.controller.fetch_vnf_package_artifacts,
req, constants.UUID,
constants.ARTIFACT_PATH)
@ddt.ddt
class TestControllerEnhancedPolicy(TestController):
def setUp(self):
super(TestControllerEnhancedPolicy, self).setUp()
cfg.CONF.set_override(
"enhanced_tacker_policy", True, group='oslo_policy')
@ddt.data(*fakes.get_test_data_pkg_uploaded('show', http_client.OK))
@ddt.unpack
@mock.patch.object(VnfSoftwareImage, 'get_by_id')
@mock.patch.object(vnf_package.VnfPackage, "get_by_id")
def test_show_enhanced_policy(
self, mock_vnf_by_id, mock_sw_image_by_id,
vnfd_updates, rules, roles, expected_status_code):
mock_vnf_by_id.return_value = fakes.return_vnfpkg_obj(
vnfd_updates=vnfd_updates)
mock_sw_image_by_id.return_value = fakes.return_software_image()
req = fake_request.HTTPRequest.blank(
'/vnf_packages/%s' % constants.UUID)
req.method = 'GET'
policy.set_rules(oslo_policy.Rules.from_dict(rules), overwrite=True)
ctx = context.Context(
'fake', 'fake', roles=roles)
resp = req.get_response(fakes.wsgi_app_v1(fake_auth_context=ctx))
self.assertEqual(expected_status_code, resp.status_code)
@ddt.data(
*fakes.get_test_data_pkg_uploaded('delete', http_client.NO_CONTENT))
@ddt.unpack
@mock.patch.object(vnf_package.VnfPackage, "get_by_id")
@mock.patch.object(VNFPackageRPCAPI, "delete_vnf_package")
def test_delete_enhanced_policy(
self, mock_delete_rpc, mock_vnf_by_id, vnfd_updates, rules, roles,
expected_status_code):
vnfpkg_updates = {'operational_state': 'DISABLED'}
mock_vnf_by_id.return_value = fakes.return_vnfpkg_obj(
vnf_package_updates=vnfpkg_updates, vnfd_updates=vnfd_updates)
req = fake_request.HTTPRequest.blank(
'/vnf_packages/%s' % constants.UUID)
req.headers['Content-Type'] = 'application/json'
req.method = 'DELETE'
policy.set_rules(oslo_policy.Rules.from_dict(rules), overwrite=True)
ctx = context.Context(
'fake', 'fake', roles=roles)
resp = req.get_response(fakes.wsgi_app_v1(fake_auth_context=ctx))
self.assertEqual(expected_status_code, resp.status_code)
@ddt.data(*fakes.get_test_data_pkg_uploaded('patch', http_client.OK))
@ddt.unpack
@mock.patch.object(vnf_package.VnfPackage, "get_by_id")
@mock.patch.object(vnf_package.VnfPackage, "save")
def test_patch_enhanced_policy(self, mock_save, mock_vnf_by_id,
vnfd_updates, rules, roles, expected_status_code):
vnf_package_updates = {'operational_state': 'DISABLED'}
mock_vnf_by_id.return_value = \
fakes.return_vnfpkg_obj(
vnf_package_updates=vnf_package_updates,
vnfd_updates=vnfd_updates
)
req_body = {"operationalState": "ENABLED",
"userDefinedData": {"testKey1": "val01",
"testKey2": "val02", "testkey3": "val03"}}
req = fake_request.HTTPRequest.blank(
'/vnf_packages/%s'
% constants.UUID)
req.headers['Content-Type'] = 'application/json'
req.method = 'PATCH'
req.body = jsonutils.dump_as_bytes(req_body)
policy.set_rules(oslo_policy.Rules.from_dict(rules), overwrite=True)
ctx = context.Context(
'fake', 'fake', roles=roles)
resp = req.get_response(fakes.wsgi_app_v1(fake_auth_context=ctx))
self.assertEqual(expected_status_code, resp.status_code)
if expected_status_code != http_client.FORBIDDEN:
self.assertEqual(req_body, jsonutils.loads(resp.body))
@ddt.data(*fakes.get_test_data_pkg_uploaded(
'get_vnf_package_vnfd', http_client.OK))
@ddt.unpack
@mock.patch.object(VNFPackageRPCAPI, "get_vnf_package_vnfd")
@mock.patch.object(vnf_package.VnfPackage, "get_by_id")
def test_get_vnf_package_vnfd_enhanced_policy(
self, mock_vnf_by_id, mock_get_vnf_package_vnfd,
vnfd_updates, rules, roles, expected_status_code):
mock_vnf_by_id.return_value = fakes.return_vnfpkg_obj(
vnfd_updates=vnfd_updates)
mock_get_vnf_package_vnfd.return_value = fakes.return_vnfd_data()
req = fake_request.HTTPRequest.blank(
'/vnf_packages/%s/vnfd'
% constants.UUID)
req.headers['Accept'] = 'application/zip'
req.method = 'GET'
policy.set_rules(oslo_policy.Rules.from_dict(rules), overwrite=True)
ctx = context.Context('fake', 'fake', roles=roles)
resp = req.get_response(fakes.wsgi_app_v1(fake_auth_context=ctx))
self.assertEqual(expected_status_code, resp.status_code)
@ddt.data(*fakes.get_test_data_pkg_uploaded('fetch_package_content',
http_client.ACCEPTED))
@ddt.unpack
@mock.patch.object(controller.VnfPkgmController, "_download")
@mock.patch.object(controller.VnfPkgmController, "_get_range_from_request")
@mock.patch.object(glance_store, 'get_csar_size')
@mock.patch.object(controller.VnfPkgmController, '_get_vnf_package')
@mock.patch.object(vnf_package.VnfPackage, 'save')
def test_fetch_vnf_package_content_enhanced_policy(
self,
mock_save,
mock_get_vnf_package,
mock_get_csar_size,
mock_get_range,
mock_download,
vnfd_updates, rules, roles, expected_status_code):
mock_get_vnf_package.return_value = fakes.return_vnfpkg_obj(
vnfd_updates=vnfd_updates)
mock_get_csar_size.return_value = 1000
mock_get_range.return_value = "10-20, 21-30"
mock_download.return_value = "Response"
request = fake_request.HTTPRequest.blank(
'/vnf_packages/%s/package_content' % constants.UUID)
request.method = 'GET'
request.headers["Range"] = 'bytes=10-20,21-30'
request.response = ""
policy.set_rules(oslo_policy.Rules.from_dict(rules), overwrite=True)
ctx = context.Context('fake', 'fake', roles=roles)
resp = request.get_response(fakes.wsgi_app_v1(fake_auth_context=ctx))
self.assertEqual(expected_status_code, resp.status_code)
@ddt.data(*fakes.get_test_data_pkg_uploaded('fetch_artifact',
http_client.OK))
@ddt.unpack
@mock.patch.object(controller.VnfPkgmController, "_get_csar_path")
@mock.patch.object(vnf_package.VnfPackage, "get_by_id")
def test_fetch_vnf_package_artifacts_enhanced_policy(
self, mock_vnf_by_id, mock_get_csar_path,
vnfd_updates, rules, roles, expected_status_code):
mock_vnf_by_id.return_value = fakes.return_vnfpkg_obj(
vnfd_updates=vnfd_updates)
base_path = os.path.dirname(os.path.abspath(__file__))
extract_path = os.path.join(base_path, '../../etc/samples/'
'sample_vnf_package_csar_in_meta_and_manifest')
mock_get_csar_path.return_value = extract_path
req = fake_request.HTTPRequest.blank(
'/vnf_packages/%s/artifacts/%s'
% (constants.UUID, constants.ARTIFACT_PATH))
req.method = 'GET'
policy.set_rules(oslo_policy.Rules.from_dict(rules), overwrite=True)
ctx = context.Context('fake', 'fake', roles=roles)
resp = req.get_response(fakes.wsgi_app_v1(fake_auth_context=ctx))
self.assertEqual(expected_status_code, resp.status_code)
@ddt.data(*fakes.get_test_data_pkg_index())
@ddt.unpack
@mock.patch.object(VnfPackagesList, "get_by_filters")
def test_index_enhanced_policy(
self, mock_vnf_list, pkg_list, rules, roles, expected_pkg_ids):
mock_vnf_list.return_value = pkg_list
req = fake_request.HTTPRequest.blank('/vnf_packages')
req.headers['Content-Type'] = 'application/json'
req.method = 'GET'
policy.set_rules(oslo_policy.Rules.from_dict(rules), overwrite=True)
ctx = context.Context('fake', 'fake', roles=roles)
resp = req.get_response(fakes.wsgi_app_v1(fake_auth_context=ctx))
self.assertEqual(http_client.OK, resp.status_code)
self.assertEqual(
expected_pkg_ids, [pkg.get('id') for pkg in resp.json])