Merge "Add create vnf instance API"

This commit is contained in:
Zuul 2020-04-22 04:10:41 +00:00 committed by Gerrit Code Review
commit 9410a518ba
13 changed files with 755 additions and 1 deletions

View File

@ -0,0 +1,33 @@
# Copyright (C) 2020 NTT DATA
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Schema for vnf lcm APIs.
"""
from tacker.api.validation import parameter_types
create = {
'type': 'object',
'properties': {
'vnfdId': parameter_types.uuid,
'vnfInstanceName': parameter_types.name_allow_zero_min_length,
'vnfInstanceDescription': parameter_types.description,
},
'required': ['vnfdId'],
'additionalProperties': False,
}

View File

@ -18,6 +18,103 @@ Common parameter types for validating request Body.
"""
import re
import unicodedata
import six
def _is_printable(char):
"""determine if a unicode code point is printable.
This checks if the character is either "other" (mostly control
codes), or a non-horizontal space. All characters that don't match
those criteria are considered printable; that is: letters;
combining marks; numbers; punctuation; symbols; (horizontal) space
separators.
"""
category = unicodedata.category(char)
return (not category.startswith("C") and
(not category.startswith("Z") or category == "Zs"))
def _get_all_chars():
for i in range(0xFFFF):
yield six.unichr(i)
# build a regex that matches all printable characters. This allows
# spaces in the middle of the name. Also note that the regexp below
# deliberately allows the empty string. This is so only the constraint
# which enforces a minimum length for the name is triggered when an
# empty string is tested. Otherwise it is not deterministic which
# constraint fails and this causes issues for some unittests when
# PYTHONHASHSEED is set randomly.
def _build_regex_range(ws=True, invert=False, exclude=None):
"""Build a range regex for a set of characters in utf8.
This builds a valid range regex for characters in utf8 by
iterating the entire space and building up a set of x-y ranges for
all the characters we find which are valid.
:param ws: should we include whitespace in this range.
:param invert: invert the logic
:param exclude: any characters we want to exclude
The inversion is useful when we want to generate a set of ranges
which is everything that's not a certain class. For instance,
produce all all the non printable characters as a set of ranges.
"""
if exclude is None:
exclude = []
regex = ""
# are we currently in a range
in_range = False
# last character we found, for closing ranges
last = None
# last character we added to the regex, this lets us know that we
# already have B in the range, which means we don't need to close
# it out with B-B. While the later seems to work, it's kind of bad form.
last_added = None
def valid_char(char):
if char in exclude:
result = False
elif ws:
result = _is_printable(char)
else:
# Zs is the unicode class for space characters, of which
# there are about 10 in this range.
result = (_is_printable(char) and
unicodedata.category(char) != "Zs")
if invert is True:
return not result
return result
# iterate through the entire character range. in_
for c in _get_all_chars():
if valid_char(c):
if not in_range:
regex += re.escape(c)
last_added = c
in_range = True
else:
if in_range and last != last_added:
regex += "-" + re.escape(last)
in_range = False
last = c
else:
if in_range:
regex += "-" + re.escape(c)
return regex
valid_description_regex_base = '^[%s]*$'
valid_description_regex = valid_description_regex_base % (
_build_regex_range())
keyvalue_pairs = {
'type': 'object',
@ -28,3 +125,16 @@ keyvalue_pairs = {
},
'additionalProperties': False
}
description = {
'type': 'string', 'minLength': 0, 'maxLength': 1024,
'pattern': valid_description_regex,
}
uuid = {
'type': 'string', 'format': 'uuid'
}
name_allow_zero_min_length = {
'type': 'string', 'minLength': 0, 'maxLength': 255
}

View File

@ -20,6 +20,7 @@ Internal implementation of request Body validating middleware.
import jsonschema
from jsonschema import exceptions as jsonschema_exc
from oslo_utils import uuidutils
import rfc3986
import six
@ -32,6 +33,11 @@ def _validate_uri(instance):
require_authority=True)
@jsonschema.FormatChecker.cls_checks('uuid')
def _validate_uuid_format(instance):
return uuidutils.is_uuid_like(instance)
class FormatChecker(jsonschema.FormatChecker):
"""A FormatChecker can output the message from cause exception

View File

@ -0,0 +1,60 @@
# Copyright (C) 2020 NTT DATA
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.common import utils
from tacker.objects import fields
class ViewBuilder(object):
def _get_links(self, vnf_instance):
links = {
"self": {
"href": '/vnflcm/v1/vnf_instances/%s' % vnf_instance.id
}
}
instantiate_link = {
"instantiate": {
"href": '/vnflcm/v1/vnf_instances/%s/instantiate'
% vnf_instance.id
}
}
if (vnf_instance.instantiation_state ==
fields.VnfInstanceState.NOT_INSTANTIATED):
instantiate_link = {
"instantiate": {
"href": '/vnflcm/v1/vnf_instances/%s/instantiate'
% vnf_instance.id
}
}
links.update(instantiate_link)
return {"_links": links}
def _get_vnf_instance_info(self, vnf_instance):
vnf_instance_dict = vnf_instance.to_dict()
vnf_instance_dict = utils.convert_snakecase_to_camelcase(
vnf_instance_dict)
links = self._get_links(vnf_instance)
vnf_instance_dict.update(links)
return vnf_instance_dict
def create(self, vnf_instance):
return self._get_vnf_instance_info(vnf_instance)

View File

@ -13,15 +13,60 @@
# License for the specific language governing permissions and limitations
# under the License.
import six
from six.moves import http_client
import webob
from tacker.api.schemas import vnf_lcm
from tacker.api import validation
from tacker.api.views import vnf_lcm as vnf_lcm_view
from tacker.common import exceptions
from tacker.common import utils
from tacker import objects
from tacker.objects import fields
from tacker.policies import vnf_lcm as vnf_lcm_policies
from tacker import wsgi
class VnfLcmController(wsgi.Controller):
_view_builder_class = vnf_lcm_view.ViewBuilder
def _get_vnf_instance_href(self, vnf_instance):
return '/vnflcm/v1/vnf_instances/%s' % vnf_instance.id
@wsgi.response(http_client.CREATED)
@wsgi.expected_errors((http_client.BAD_REQUEST, http_client.FORBIDDEN))
@validation.schema(vnf_lcm.create)
def create(self, request, body):
raise webob.exc.HTTPNotImplemented()
context = request.environ['tacker.context']
context.can(vnf_lcm_policies.VNFLCM % 'create')
req_body = utils.convert_camelcase_to_snakecase(body)
vnfd_id = req_body.get('vnfd_id')
try:
vnfd = objects.VnfPackageVnfd.get_by_id(request.context,
vnfd_id)
except exceptions.VnfPackageVnfdNotFound as exc:
raise webob.exc.HTTPBadRequest(explanation=six.text_type(exc))
vnf_instance = objects.VnfInstance(
context=request.context,
vnf_instance_name=req_body.get('vnf_instance_name'),
vnf_instance_description=req_body.get(
'vnf_instance_description'),
vnfd_id=vnfd_id,
instantiation_state=fields.VnfInstanceState.NOT_INSTANTIATED,
vnf_provider=vnfd.vnf_provider,
vnf_product_name=vnfd.vnf_product_name,
vnf_software_version=vnfd.vnf_software_version,
vnfd_version=vnfd.vnfd_version,
tenant_id=request.context.project_id)
vnf_instance.create()
result = self._view_builder.create(vnf_instance)
headers = {"location": self._get_vnf_instance_href(vnf_instance)}
return wsgi.ResponseObject(result, headers=headers)
def show(self, request, id):
raise webob.exc.HTTPNotImplemented()

View File

@ -219,6 +219,10 @@ class VnfResourceNotFound(NotFound):
message = _("No vnf resource with id %(id)s.")
class VnfPackageVnfdNotFound(NotFound):
message = _("No vnf package vnfd with vnfd_id %(id)s.")
class VnfDeploymentFlavourNotFound(NotFound):
message = _("No vnf deployment flavour with id %(id)s.")

View File

@ -332,6 +332,66 @@ def chunkiter(fp, chunk_size=65536):
break
def convert_camelcase_to_snakecase(request_data):
"""Converts dict keys or list of dict keys from camelCase to snake_case.
Returns a dict with keys or list with dict keys, in snake_case.
:param request_data: dict with keys or list with items, in camelCase.
"""
def convert(name):
name_with_underscores = re.sub(
'(.)([A-Z][a-z]+)', r'\1_\2', name)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2',
name_with_underscores).lower()
if isinstance(request_data, dict):
new_dict = {}
for key, property_value in request_data.items():
property_value = convert_camelcase_to_snakecase(property_value)
underscore_joined = convert(key)
new_dict[underscore_joined] = property_value
return new_dict
if isinstance(request_data, list):
new_list = []
for property_value in request_data:
new_list.append(
convert_camelcase_to_snakecase(property_value))
return new_list
return request_data
def convert_snakecase_to_camelcase(request_data):
"""Converts dict keys or list of dict keys from snake_case to camelCase.
Returns a dict with keys or list with dict key, in camelCase.
:param request_data: dict with keys or list with items, in snake_case.
"""
def convert(name):
return re.sub('_([a-z])',
lambda match: match.group(1).upper(), name)
if isinstance(request_data, dict):
new_dict = {}
for key, property_value in request_data.items():
property_value = convert_snakecase_to_camelcase(property_value)
camelcase = convert(key)
new_dict[camelcase] = property_value
return new_dict
if isinstance(request_data, list):
new_list = []
for property_value in request_data:
new_list.append(
convert_snakecase_to_camelcase(property_value))
return new_list
return request_data
class CooperativeReader(object):
"""An eventlet thread friendly class for reading in image data.

View File

@ -17,6 +17,7 @@
import itertools
from tacker.policies import base
from tacker.policies import vnf_lcm
from tacker.policies import vnf_package
@ -24,4 +25,5 @@ def list_rules():
return itertools.chain(
base.list_rules(),
vnf_package.list_rules(),
vnf_lcm.list_rules(),
)

View File

@ -0,0 +1,40 @@
# Copyright (C) 2020 NTT DATA
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_policy import policy
from tacker.policies import base
VNFLCM = 'os_nfv_orchestration_api:vnf_instances:%s'
rules = [
policy.DocumentedRuleDefault(
name=VNFLCM % 'create',
check_str=base.RULE_ADMIN_OR_OWNER,
description="Creates vnf instance.",
operations=[
{
'method': 'POST',
'path': '/vnflcm/v1/vnf_instances'
}
]
),
]
def list_rules():
return rules

View File

@ -36,3 +36,57 @@ class TestChangeMemory(testtools.TestCase):
actual_val = utils.change_memory_unit("1 GB", "MB")
expected_val = 1024
self.assertEqual(expected_val, actual_val)
class TestCamelToSnakeCase(testtools.TestCase):
def test_convert_camelcase_to_snakecase_dict(self):
"""Only the dict keys should be converted to snakecase"""
actual_val = utils.convert_camelcase_to_snakecase(
{"camelCaseKey": "camelCaseValue"})
expected_val = {"camel_case_key": "camelCaseValue"}
self.assertEqual(expected_val, actual_val)
def test_convert_camelcase_to_snakecase_list_with_dict_items(self):
"""Only the dict keys from list should be converted to snakecase"""
data = [{"camelCaseKey": "camelCaseValue"}]
actual_val = utils.convert_camelcase_to_snakecase(data)
expected_val = [{"camel_case_key": "camelCaseValue"}]
self.assertEqual(expected_val, actual_val)
def test_convert_camelcase_to_snakecase_list_with_string_items(self):
"""Conversion of camelcase to snakecase should be ignored.
For simple list with string items, the elements which are actual
values should be ignored during conversion
"""
data = ["camelCaseValue1", "camelCaseValue2"]
actual_val = utils.convert_snakecase_to_camelcase(data)
expected_val = ["camelCaseValue1", "camelCaseValue2"]
self.assertEqual(expected_val, actual_val)
class TestSnakeToCamelCase(testtools.TestCase):
def test_convert_snakecase_to_camelcase_dict(self):
"""Only the dict keys from list should be converted to camelcase"""
actual_val = utils.convert_snakecase_to_camelcase(
{"snake_case_key": "snake_case_value"})
expected_val = {"snakeCaseKey": "snake_case_value"}
self.assertEqual(expected_val, actual_val)
def test_convert_snakecase_to_camelcase_list_with_dict_items(self):
"""Only the dict keys from list should be converted to camelcase"""
data = [{"snake_case_key": "snake_case_value"}]
actual_val = utils.convert_snakecase_to_camelcase(data)
expected_val = [{"snakeCaseKey": "snake_case_value"}]
self.assertEqual(expected_val, actual_val)
def test_convert_snakecase_to_camelcase_list_with_string_items(self):
"""Conversion of snakecase to camelcase should be ignored.
For simple list with string items, the elements which are actual
values should be ignored during conversion
"""
data = ["snake_case_value1", "snake_case_value2"]
actual_val = utils.convert_snakecase_to_camelcase(data)
expected_val = ["snake_case_value1", "snake_case_value2"]
self.assertEqual(expected_val, actual_val)

View File

View File

@ -0,0 +1,145 @@
# Copyright (C) 2020 NTT DATA
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import iso8601
import os
import webob
from tacker.api.vnflcm.v1.router import VnflcmAPIRouter
from tacker import context
from tacker.db.db_sqlalchemy import models
from tacker.objects import fields
from tacker.tests import constants
from tacker.tests import uuidsentinel
from tacker import wsgi
def fake_vnf_package_vnfd_model_dict(**updates):
vnfd = {
'package_uuid': uuidsentinel.package_uuid,
'deleted': False,
'deleted_at': None,
'updated_at': None,
'created_at': datetime.datetime(2020, 1, 1, 1, 1, 1,
tzinfo=iso8601.UTC),
'vnf_product_name': 'Sample VNF',
'vnf_provider': 'test vnf provider',
'vnf_software_version': '1.0',
'vnfd_id': uuidsentinel.vnfd_id,
'vnfd_version': '1.0',
'id': constants.UUID,
}
if updates:
vnfd.update(updates)
return vnfd
def return_vnf_package_vnfd():
model_obj = models.VnfPackageVnfd()
model_obj.update(fake_vnf_package_vnfd_model_dict())
return model_obj
def _model_non_instantiated_vnf_instance(**updates):
vnf_instance = {
'created_at': datetime.datetime(2020, 1, 1, 1, 1, 1,
tzinfo=iso8601.UTC),
'deleted': False,
'deleted_at': None,
'id': uuidsentinel.vnf_instance_id,
'instantiated_vnf_info': None,
'instantiation_state': fields.VnfInstanceState.NOT_INSTANTIATED,
'updated_at': None,
'vim_connection_info': [],
'vnf_instance_description': 'Vnf instance description',
'vnf_instance_name': 'Vnf instance name',
'vnf_product_name': 'Sample VNF',
'vnf_provider': 'Vnf provider',
'vnf_software_version': '1.0',
'tenant_id': uuidsentinel.tenant_id,
'vnfd_id': uuidsentinel.vnfd_id,
'vnfd_version': '1.0'}
if updates:
vnf_instance.update(**updates)
return vnf_instance
def return_vnf_instance_model(
instantiated_state=fields.VnfInstanceState.NOT_INSTANTIATED,
**updates):
model_obj = models.VnfInstance()
if instantiated_state == fields.VnfInstanceState.NOT_INSTANTIATED:
model_obj.update(_model_non_instantiated_vnf_instance(**updates))
return model_obj
def fake_vnf_instance_response(**updates):
vnf_instance = {
'vnfInstanceDescription': 'Vnf instance description',
'vnfInstanceName': 'Vnf instance name',
'vnfProductName': 'Sample VNF',
'_links': {
'self': {'href': os.path.join('/vnflcm/v1/vnf_instances/',
uuidsentinel.vnf_instance_id)},
'instantiate': {
'href': os.path.join('/vnflcm/v1/vnf_instances',
uuidsentinel.vnf_instance_id, 'instantiate')
}
},
'instantiationState': 'NOT_INSTANTIATED',
'vnfProvider': 'Vnf provider',
'vnfdId': uuidsentinel.vnfd_id,
'vnfdVersion': '1.0',
'vnfSoftwareVersion': '1.0',
'id': uuidsentinel.vnf_instance_id
}
if updates:
vnf_instance.update(**updates)
return vnf_instance
class InjectContext(wsgi.Middleware):
"""Add a 'tacker.context' to WSGI environ."""
def __init__(self, context, *args, **kwargs):
self.context = context
super(InjectContext, self).__init__(*args, **kwargs)
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req):
req.environ['tacker.context'] = self.context
return self.application
def wsgi_app_v1(fake_auth_context=None):
inner_app_v1 = VnflcmAPIRouter()
if fake_auth_context is not None:
ctxt = fake_auth_context
else:
ctxt = context.ContextBase(uuidsentinel.user_id,
uuidsentinel.project_id, is_admin=True)
api_v1 = InjectContext(ctxt, inner_app_v1)
return api_v1

View File

@ -0,0 +1,195 @@
# Copyright (C) 2020 NTT DATA
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
from oslo_serialization import jsonutils
from six.moves import http_client
from webob import exc
from tacker.api.vnflcm.v1 import controller
from tacker.common import exceptions
from tacker import objects
from tacker.tests.unit import base
from tacker.tests.unit import fake_request
from tacker.tests.unit.vnflcm import fakes
from tacker.tests import uuidsentinel
@ddt.ddt
class TestController(base.TestCase):
def setUp(self):
super(TestController, self).setUp()
self.controller = controller.VnfLcmController()
@property
def app(self):
return fakes.wsgi_app_v1()
@mock.patch.object(objects.vnf_instance, '_vnf_instance_create')
@mock.patch.object(objects.VnfPackageVnfd, 'get_by_id')
def test_create_without_name_and_description(
self, mock_get_by_id, mock_vnf_instance_create):
mock_get_by_id.return_value = fakes.return_vnf_package_vnfd()
updates = {'vnf_instance_description': None,
'vnf_instance_name': None}
mock_vnf_instance_create.return_value =\
fakes.return_vnf_instance_model(**updates)
req = fake_request.HTTPRequest.blank('/vnf_instances')
body = {'vnfdId': uuidsentinel.vnfd_id}
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = 'POST'
# Call create API
resp = req.get_response(self.app)
self.assertEqual(http_client.CREATED, resp.status_code)
updates = {'vnfInstanceDescription': None, 'vnfInstanceName': None}
expected_vnf = fakes.fake_vnf_instance_response(**updates)
location_header = ('http://localhost/vnflcm/v1/vnf_instances/%s'
% resp.json['id'])
self.assertEqual(expected_vnf, resp.json)
self.assertEqual(location_header, resp.headers['location'])
@mock.patch.object(objects.vnf_instance, '_vnf_instance_create')
@mock.patch.object(objects.VnfPackageVnfd, 'get_by_id')
def test_create_with_name_and_description(
self, mock_get_by_id, mock_vnf_instance_create):
mock_get_by_id.return_value = fakes.return_vnf_package_vnfd()
updates = {'vnf_instance_description': 'SampleVnf Description',
'vnf_instance_name': 'SampleVnf'}
mock_vnf_instance_create.return_value =\
fakes.return_vnf_instance_model(**updates)
body = {'vnfdId': uuidsentinel.vnfd_id,
"vnfInstanceName": "SampleVnf",
"vnfInstanceDescription": "SampleVnf Description"}
req = fake_request.HTTPRequest.blank('/vnf_instances')
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = 'POST'
# Call Create API
resp = req.get_response(self.app)
self.assertEqual(http_client.CREATED, resp.status_code)
updates = {"vnfInstanceName": "SampleVnf",
"vnfInstanceDescription": "SampleVnf Description"}
expected_vnf = fakes.fake_vnf_instance_response(**updates)
location_header = ('http://localhost/vnflcm/v1/vnf_instances/%s'
% resp.json['id'])
self.assertEqual(expected_vnf, resp.json)
self.assertEqual(location_header, resp.headers['location'])
@ddt.data(
{'attribute': 'vnfdId', 'value': True,
'expected_type': 'uuid'},
{'attribute': 'vnfdId', 'value': 123,
'expected_type': 'uuid'},
{'attribute': 'vnfInstanceName', 'value': True,
'expected_type': "name_allow_zero_min_length"},
{'attribute': 'vnfInstanceName', 'value': 123,
'expected_type': "name_allow_zero_min_length"},
{'attribute': 'vnfInstanceDescription', 'value': True,
'expected_type': 'description'},
{'attribute': 'vnfInstanceDescription', 'value': 123,
'expected_type': 'description'},
)
@ddt.unpack
def test_create_with_invalid_request_body(
self, attribute, value, expected_type):
"""value of attribute in body is of invalid type"""
body = {"vnfInstanceName": "SampleVnf",
"vnfdId": "29c770a3-02bc-4dfc-b4be-eb173ac00567",
"vnfInstanceDescription": "VNF Description"}
req = fake_request.HTTPRequest.blank('/vnf_instances')
body.update({attribute: value})
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = 'POST'
exception = self.assertRaises(
exceptions.ValidationError, self.controller.create,
req, body=body)
if expected_type == 'uuid':
expected_message = ("Invalid input for field/attribute "
"{attribute}. Value: {value}. {value} is not "
"of type 'string'".
format(value=value, attribute=attribute,
expected_type=expected_type))
elif expected_type in ["name_allow_zero_min_length", "description"]:
expected_message = ("Invalid input for field/attribute "
"{attribute}. " "Value: {value}. {value} is "
"not of type 'string'".
format(value=value, attribute=attribute,
expected_type=expected_type))
self.assertEqual(expected_message, exception.msg)
@mock.patch.object(objects.VnfPackageVnfd, 'get_by_id')
def test_create_non_existing_vnf_package_vnfd(self, mock_vnf_by_id):
mock_vnf_by_id.side_effect = exceptions.VnfPackageVnfdNotFound
body = {'vnfdId': uuidsentinel.vnfd_id}
req = fake_request.HTTPRequest.blank('/vnf_instances')
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = 'POST'
self.assertRaises(exc.HTTPBadRequest, self.controller.create, req,
body=body)
def test_create_without_vnfd_id(self):
body = {"vnfInstanceName": "SampleVnfInstance"}
req = fake_request.HTTPRequest.blank(
'/vnf_instances')
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = 'POST'
resp = req.get_response(self.app)
self.assertEqual(http_client.BAD_REQUEST, resp.status_code)
@ddt.data('PATCH', 'PUT', 'HEAD', 'DELETE')
def test_create_not_allowed_http_method(self, method):
"""Wrong HTTP method"""
body = {"vnfdId": uuidsentinel.vnfd_id}
req = fake_request.HTTPRequest.blank('/vnf_instances')
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = method
resp = req.get_response(self.app)
self.assertEqual(http_client.METHOD_NOT_ALLOWED, resp.status_code)
@ddt.data({'name': "A" * 256, 'description': "VNF Description"},
{'name': 'Fake-VNF', 'description': "A" * 1025})
@ddt.unpack
def test_create_max_length_exceeded_for_vnf_name_and_description(
self, name, description):
# vnf instance_name and description with length greater than max
# length defined
body = {"vnfInstanceName": name,
"vnfdId": uuidsentinel.vnfd_id,
"vnfInstanceDescription": description}
req = fake_request.HTTPRequest.blank(
'/vnf_instances')
req.body = jsonutils.dump_as_bytes(body)
req.headers['Content-Type'] = 'application/json'
req.method = 'POST'
resp = req.get_response(self.app)
self.assertEqual(http_client.BAD_REQUEST, resp.status_code)