Support VNF LCM Coordination IF in change_vnfpkg

This patch supports Coordination IF when VNFc is updated by
change_vnfpkg v2 API.

* Modified the coordinateVNF script in the sample package
  to execute the coordination IF.
* Modified change_vnfpkg operation in Functional test
  to execute coordination IF.

Implements: blueprint add-sample-coordinate-script
Change-Id: Ie0c8b30df9d6e8c9ae4a6ba9e894203561475022
This commit is contained in:
Ken Fujimoto 2023-01-31 03:30:16 +00:00
parent d457da69d9
commit a423efca83
12 changed files with 598 additions and 19 deletions

View File

@ -0,0 +1,7 @@
features:
- |
Support the client function of the VNF LCM Coordination API in the
Coordinate VNF script when performing the RollingUpdate with
external management systems in the ChangeCurrentVNFPackage API.
The sample script implements only the client function,
not formal support for the VNF LCM Coordination API itself.

View File

@ -0,0 +1,122 @@
# Copyright (C) 2023 Nippon Telegraph and Telephone Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo_log import log as logging
from tacker.sol_refactored.common import exceptions as sol_ex
from tacker.sol_refactored.common import http_client
LOG = logging.getLogger(__name__)
DEFAULT_INTERVAL = 5
class CoordinationApiClient(object):
def __init__(self, endpoint, auth_handle):
self.endpoint = endpoint
self.client = http_client.HttpClient(auth_handle)
def create_coordination(self, coord_request):
url = "{}/lcmcoord/v1/coordinations".format(self.endpoint)
resp, body = self.client.do_request(
url, "POST", expected_status=[201, 202, 503], version="1.0.0",
body=coord_request)
return resp, body
def get_coordination(self, coord_id):
url = "{}/lcmcoord/v1/coordinations/{}".format(self.endpoint, coord_id)
resp, body = self.client.do_request(
url, "GET", expected_status=[200, 202], version="1.0.0")
return resp, body
# NOTE: The following are notes on this feature.
# - "cancel" function is not implemented.
# - Only calling from "change_vnfpkg" is assumed via the coordinationVNF
# script.
def create_coordination(endpoint, authentication, coord_request):
# TODO(fuji): auth_handle creation process should create
# a common auth_handle creation process and use it.
if 'OAUTH2_CLIENT_CREDENTIALS' in authentication['authType']:
oauth2_req = authentication.get('paramsOauth2ClientCredentials')
if oauth2_req is None:
msg = "paramsOauth2ClientCredentials must be specified."
raise sol_ex.InvalidSubscription(sol_detail=msg)
auth_handle = http_client.OAuth2AuthHandle(
endpoint, oauth2_req.get('tokenEndpoint'),
oauth2_req.get('clientId'), oauth2_req.get('clientPassword'))
elif 'BASIC' in authentication['authType']:
basic_req = authentication.get('paramsBasic')
if basic_req is None:
msg = "paramsBasic must be specified."
raise sol_ex.InvalidSubscription(sol_detail=msg)
auth_handle = http_client.BasicAuthHandle(
basic_req.get('userName'), basic_req.get('password'))
# TODO(fuji): mTLS will be supported in the future.
else:
msg = "authType is incorrect or not specified."
raise sol_ex.InvalidSubscription(sol_detail=msg)
client = CoordinationApiClient(endpoint, auth_handle)
def _get_retry_after(resp):
if resp.headers.get('Retry-After') is None:
LOG.debug("Retry-After header not included in response. "
"Use DEFAULT_INTERVAL.")
return DEFAULT_INTERVAL
try:
return int(resp.headers.get('Retry-After'))
except ValueError:
# may be HTTP-date format. it is not supported.
# use DEFAULT_INTERVAL
LOG.warning("The value of Retry-After header may be "
"HTTP-date format. It is not supported, "
"use DEFAULT_INTERVAL.")
return DEFAULT_INTERVAL
while (1):
# TODO(fuji): set a timeout for repeated HTTP 503 responses.
resp, body = client.create_coordination(coord_request)
if resp.status_code == 201:
# synchronous mode. done.
return body
elif resp.status_code == 202:
# asynchronous mode.
break
# else: 503
time.sleep(_get_retry_after(resp))
# asynchronous mode.
location = resp.headers.get('Location')
if location is None:
msg = "Location header not included in response."
raise sol_ex.SolException(sol_detail=msg)
coord_id = location.split('/')[-1]
while (1):
# TODO(fuji): set a timeout for repeated HTTP 503 responses.
time.sleep(_get_retry_after(resp))
resp, body = client.get_coordination(coord_id)
if resp.status_code == 200:
# asynchronous mode. done.
return body
# else: 202

View File

@ -268,7 +268,8 @@ class SshIpNotFoundException(SolHttpError404):
class CoordinateVNFExecutionFailed(SolHttpError422):
message = _('CoordinateVNF execution failed.')
title = 'Coordinate VNF execution failed'
# detail set in the code
class VmRunningFailed(SolHttpError422):

View File

@ -28,6 +28,7 @@ from oslo_utils import uuidutils
from tacker.sol_refactored.common import config
from tacker.sol_refactored.common import exceptions as sol_ex
from tacker.sol_refactored.common import lcm_op_occ_utils as lcmocc_utils
from tacker.sol_refactored.common import vnf_instance_utils as inst_utils
from tacker.sol_refactored.infra_drivers.openstack import heat_utils
from tacker.sol_refactored.infra_drivers.openstack import nova_utils
@ -491,8 +492,8 @@ class Openstack(object):
heat_client.update_stack(stack_name, update_fields)
# execute coordinate_vnf_script
self._execute_coordinate_vnf_script(req, vnfd, vnfc, heat_client,
is_rollback)
self._execute_coordinate_vnf_script(
req, vnfd, vnfc, inst, grant_req, heat_client, is_rollback)
def _change_vnfpkg_rolling_update_user_data_standard(self, req, inst,
grant_req, grant, vnfd, fields, heat_client, vnfcs, is_rollback):
@ -565,8 +566,8 @@ class Openstack(object):
heat_client.update_stack(stack_name, update_fields)
# execute coordinate_vnf_script
self._execute_coordinate_vnf_script(req, vnfd, vnfc, heat_client,
is_rollback)
self._execute_coordinate_vnf_script(
req, vnfd, vnfc, inst, grant_req, heat_client, is_rollback)
def _get_ssh_ip(self, stack_id, cp_name, heat_client):
# NOTE: It is assumed that if the user want to use floating_ip,
@ -578,8 +579,8 @@ class Openstack(object):
elif cp_info.get('attributes', {}).get('fixed_ips'):
return cp_info['attributes']['fixed_ips'][0].get('ip_address')
def _execute_coordinate_vnf_script(self, req, vnfd, vnfc, heat_client,
is_rollback):
def _execute_coordinate_vnf_script(self, req, vnfd, vnfc, inst, grant_req,
heat_client, is_rollback):
if is_rollback:
script = req.additionalParams.get(
'lcm-operation-coordinate-old-vnf')
@ -605,6 +606,30 @@ class Openstack(object):
vnfc_param['ssh_ip'] = ssh_ip
vnfc_param['is_rollback'] = is_rollback
coord_req = objects.LcmCoordRequest(
vnfInstanceId=inst.id,
vnfLcmOpOccId=grant_req.vnfLcmOpOccId,
lcmOperationType=grant_req.operation,
# NOTE: coordinationActionName is set to the dummy value.
# The value of coordinationActionName must be set in the
# coordinateVNF script.
coordinationActionName="should_be_set_by_script",
_links=objects.LcmCoordRequest_Links(
vnfLcmOpOcc=objects.Link(
href=lcmocc_utils.lcmocc_href(grant_req.vnfLcmOpOccId,
CONF.v2_vnfm.endpoint)),
vnfInstance=objects.Link(
href=inst_utils.inst_href(inst.id,
CONF.v2_vnfm.endpoint))
)
)
vnfc_param['LcmCoordRequest'] = coord_req.to_dict()
vnfc_param['inst'] = inst.to_dict()
for vnfc_info in inst.instantiatedVnfInfo.vnfcInfo:
if vnfc_info.vnfcResourceInfoId == vnfc.id:
vnfc_param['vnfc_info_id'] = vnfc_info.id
break
tmp_csar_dir = vnfd.make_tmp_csar_dir()
script_path = os.path.join(tmp_csar_dir, script)
out = subprocess.run(["python3", script_path],
@ -612,8 +637,9 @@ class Openstack(object):
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
vnfd.remove_tmp_csar_dir(tmp_csar_dir)
if out.returncode != 0:
LOG.error(out)
raise sol_ex.CoordinateVNFExecutionFailed()
LOG.error(str(out.stderr))
raise sol_ex.CoordinateVNFExecutionFailed(
sol_detail=str(out.stderr))
def change_vnfpkg_rollback(self, req, inst, grant_req, grant, vnfd,
lcmocc):

View File

@ -86,6 +86,7 @@ def register_all():
__import__(objects_root + '.v2.lccn_links')
__import__(objects_root + '.v2.lccn_subscription')
__import__(objects_root + '.v2.lccn_subscription_request')
__import__(objects_root + '.v2.lcm_coord_request')
__import__(objects_root + '.v2.lifecycle_change_notifications_filter')
__import__(objects_root + '.v2.modifications_triggered_by_vnf_pkg_change')
__import__(objects_root + '.v2.monitoring_parameter')

View File

@ -0,0 +1,51 @@
# Copyright (C) 2023 Nippon Telegraph and Telephone Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.sol_refactored.objects import base
from tacker.sol_refactored.objects import fields
from tacker.sol_refactored.objects.v2 import fields as v2fields
# NFV-SOL 002
# - v3.6.1 10.5.2.2 (API version: 1.0.0)
@base.TackerObjectRegistry.register
class LcmCoordRequest(base.TackerPersistentObject,
base.TackerObjectDictCompat):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'vnfInstanceId': fields.StringField(nullable=False),
'vnfLcmOpOccId': fields.StringField(nullable=False),
# NOTE: its type is LcmOperationForCoordType according to the
# specification, but it is same as LcmOperationType.
'lcmOperationType': v2fields.LcmOperationTypeField(nullable=False),
'coordinationActionName': fields.StringField(nullable=False),
'inputParams': fields.KeyValuePairsField(nullable=True),
'_links': fields.ObjectField('LcmCoordRequest_Links', nullable=False)
}
@base.TackerObjectRegistry.register
class LcmCoordRequest_Links(base.TackerObject):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'vnfLcmOpOcc': fields.ObjectField('Link', nullable=False),
'vnfInstance': fields.ObjectField('Link', nullable=False)
}

View File

@ -24,8 +24,8 @@ def handle_notification(environ, start_response):
method = environ['REQUEST_METHOD']
print("notification %s" % method)
if method not in ['GET', 'POST']:
print(" not support method")
start_response('405 not suportted method',
print(" not supported method")
start_response('405 not supported method',
[('Content-Type', 'application/problem+json')])
problem_detail = {'status': 405,
'detail': "not supported method"}
@ -34,7 +34,7 @@ def handle_notification(environ, start_response):
authorization = environ.get("HTTP_AUTHORIZATION", "")
version = environ.get("HTTP_VERSION", "")
print(" authorizarion: %s" % authorization)
print(" authorization: %s" % authorization)
print(" version: %s" % version)
if method == 'POST':
@ -52,8 +52,8 @@ def handle_token(environ, start_response):
method = environ['REQUEST_METHOD']
print("token %s" % method)
if method not in ['POST']:
print(" not support method")
start_response('405 not suportted method',
print(" not supported method")
start_response('405 not supported method',
[('Content-Type', 'application/problem+json')])
problem_detail = {'status': 405,
'detail': "not supported method"}
@ -63,7 +63,7 @@ def handle_token(environ, start_response):
authorization = environ.get("HTTP_AUTHORIZATION", "")
version = environ.get("HTTP_VERSION", "")
content_type = environ.get("CONTENT_TYPE")
print(" authorizarion: %s" % authorization)
print(" authorization: %s" % authorization)
print(" version: %s" % version)
print(" content_type: %s" % content_type)
@ -91,6 +91,44 @@ def handle_token(environ, start_response):
return [body.encode('utf-8')]
def handle_coordinations(environ, start_response):
method = environ['REQUEST_METHOD']
print("coordinations %s" % method)
if method not in ['POST']:
print(" not supported method")
start_response('405 not supported method',
[('Content-Type', 'application/problem+json')])
problem_detail = {'status': 405,
'detail': "not supported method"}
body = json.dumps(problem_detail)
return [body.encode('utf-8')]
authorization = environ.get("HTTP_AUTHORIZATION", "")
version = environ.get("HTTP_VERSION", "")
print(" authorization: %s" % authorization)
print(" version: %s" % version)
length = environ.get('CONTENT_LENGTH')
print(" content_length: %s" % length)
body = environ.get('wsgi.input').read(int(length))
body = json.loads(body.decode('utf-8'))
print(" request body: %s" % body)
start_response('201 Created', [('Content-Type', 'application/json')])
data = {
"id": "2e11d0cb-8cb1-4418-926c-5e31f0a2538b",
"coordinationResult": "CONTINUE",
"vnfInstanceId": body.get('vnfInstanceId'),
"vnfLcmOpOccId": body.get('vnfLcmOpOccId'),
"lcmOperationType": body.get('lcmOperationType'),
"coordinationActionName": body.get('coordinationActionName'),
"_links": body.get('_links')
}
body = json.dumps(data)
print(" response body: %s" % body)
return [body.encode('utf-8')]
def notif_endpoint_app(environ, start_response):
path = environ['PATH_INFO']
@ -100,6 +138,9 @@ def notif_endpoint_app(environ, start_response):
if path == "/token":
return handle_token(environ, start_response)
if path == "/lcmcoord/v1/coordinations":
return handle_coordinations(environ, start_response)
if __name__ == '__main__':
try:

View File

@ -16,10 +16,26 @@
import os
import time
from tacker.tests.functional.sol_v2_common import base_v2
from tacker.tests.functional.sol_v2_common import paramgen
from tacker.tests.functional.sol_v2_common import test_vnflcm_basic_common
def create_coordinate_response(req_header, req_body):
resp_body = {
'id': 'aeca5328-085c-4cd6-a6f4-c010e9082528',
'coordinationResult': 'CONTINUE',
'vnfInstanceId': req_body.get('vnfInstanceId'),
'vnfLcmOpOccId': req_body.get('vnfLcmOpOccId'),
'lcmOperationType': req_body.get('lcmOperationType'),
'coordinationActionName': req_body.get('coordinationActionName'),
'_links': req_body.get('_links')
}
return resp_body
class IndividualVnfcMgmtTest(test_vnflcm_basic_common.CommonVnfLcmTest):
@classmethod
@ -308,12 +324,32 @@ class IndividualVnfcMgmtTest(test_vnflcm_basic_common.CommonVnfLcmTest):
# 6. Change_vnfpkg operation
change_vnfpkg_req = paramgen.sample4_change_vnfpkg(self.vnfd_id_2,
net_ids, subnet_ids)
for vdu_param in change_vnfpkg_req['additionalParams']['vdu_params']:
vdu_param['new_vnfc_param']['endpoint'] = (
f'http://localhost:{base_v2.FAKE_SERVER_MANAGER.SERVER_PORT}')
# Prepare coordination
base_v2.FAKE_SERVER_MANAGER.set_callback(
'POST',
'/lcmcoord/v1/coordinations',
status_code=201,
response_headers={"Content-Type": "application/json"},
callback=create_coordinate_response
)
with open('/tmp/change_vnfpkg_coordination', 'w'):
pass
# execute Change_vnfpkg operation
resp, body = self.change_vnfpkg(inst_id, change_vnfpkg_req)
self.assertEqual(202, resp.status_code)
lcmocc_id = os.path.basename(resp.headers['Location'])
self.wait_lcmocc_complete(lcmocc_id)
os.remove('/tmp/change_vnfpkg_coordination')
# Show VNF instance
resp, inst_6 = self.show_vnf_instance(inst_id)
self.assertEqual(200, resp.status_code)
@ -459,8 +495,24 @@ class IndividualVnfcMgmtTest(test_vnflcm_basic_common.CommonVnfLcmTest):
# 3. Change_vnfpkg operation
self._put_fail_file('change_vnfpkg')
with open('/tmp/change_vnfpkg_coordination', 'w'):
pass
change_vnfpkg_req = paramgen.sample4_change_vnfpkg(self.vnfd_id_2,
net_ids, subnet_ids)
for vdu_param in change_vnfpkg_req['additionalParams']['vdu_params']:
vdu_param['old_vnfc_param']['endpoint'] = (
f'http://localhost:{base_v2.FAKE_SERVER_MANAGER.SERVER_PORT}')
# Prepare coordination
base_v2.FAKE_SERVER_MANAGER.set_callback(
'POST',
'/lcmcoord/v1/coordinations',
status_code=201,
response_headers={"Content-Type": "application/json"},
callback=create_coordinate_response
)
resp, body = self.change_vnfpkg(inst_id, change_vnfpkg_req)
self.assertEqual(202, resp.status_code)
@ -473,6 +525,8 @@ class IndividualVnfcMgmtTest(test_vnflcm_basic_common.CommonVnfLcmTest):
self.assertEqual(202, resp.status_code)
self.wait_lcmocc_rolled_back(lcmocc_id)
os.remove('/tmp/change_vnfpkg_coordination')
# Show VNF instance
resp, inst_3 = self.show_vnf_instance(inst_id)
self.assertEqual(200, resp.status_code)

View File

@ -1164,12 +1164,28 @@ def sample4_change_vnfpkg(vnfd_id, net_ids, subnet_ids):
"old_vnfc_param": {
"cp_name": "VDU1_CP1",
"username": "ubuntu",
"password": "ubuntu"
"password": "ubuntu",
"endpoint": "http://127.0.0.1:6789",
"authentication": {
"authType": ["BASIC"],
"paramsBasic": {
"userName": "tacker",
"password": "tacker"
}
}
},
"new_vnfc_param": {
"cp_name": "VDU1_CP1",
"username": "ubuntu",
"password": "ubuntu"
"password": "ubuntu",
"endpoint": "http://127.0.0.1:6789",
"authentication": {
"authType": ["BASIC"],
"paramsBasic": {
"userName": "tacker",
"password": "tacker"
}
}
}
},
{
@ -1177,12 +1193,28 @@ def sample4_change_vnfpkg(vnfd_id, net_ids, subnet_ids):
"old_vnfc_param": {
"cp_name": "VDU2_CP1",
"username": "ubuntu",
"password": "ubuntu"
"password": "ubuntu",
"endpoint": "http://127.0.0.1:6789",
"authentication": {
"authType": ["BASIC"],
"paramsBasic": {
"userName": "tacker",
"password": "tacker"
}
}
},
"new_vnfc_param": {
"cp_name": "VDU2_CP1",
"username": "ubuntu",
"password": "ubuntu"
"password": "ubuntu",
"endpoint": "http://127.0.0.1:6789",
"authentication": {
"authType": ["BASIC"],
"paramsBasic": {
"userName": "tacker",
"password": "tacker"
}
}
}
}
],

View File

@ -17,6 +17,8 @@ import os
import pickle
import sys
from tacker.sol_refactored.common import coord_client
class FailScript(object):
def __init__(self, vnfc_param):
@ -30,10 +32,42 @@ class FailScript(object):
raise Exception(f'test {operation} error')
class CoordScript(object):
def __init__(self, vnfc_param):
self.vnfc_param = vnfc_param
def run(self):
if not os.path.exists('/tmp/change_vnfpkg_coordination'):
return
coord_req = self.vnfc_param['LcmCoordRequest']
coord_req['coordinationActionName'] = (
"prv.tacker_organization.coordination_test")
endpoint = self.vnfc_param.get('endpoint')
authentication = self.vnfc_param.get('authentication')
input_params = self.vnfc_param.get('inputParams')
if input_params is not None:
coord_req['inputParams'] = input_params
if endpoint is None:
raise Exception('endpoint must be specified.')
if authentication is None:
raise Exception('authentication must be specified.')
coord = coord_client.create_coordination(endpoint, authentication,
coord_req)
if coord['coordinationResult'] != "CONTINUE":
raise Exception(
f"coordinationResult is {coord['coordinationResult']}")
def main():
vnfc_param = pickle.load(sys.stdin.buffer)
script = FailScript(vnfc_param)
script.run()
script = CoordScript(vnfc_param)
script.run()
if __name__ == "__main__":

View File

@ -17,6 +17,8 @@ import os
import pickle
import sys
from tacker.sol_refactored.common import coord_client
class FailScript(object):
def __init__(self, vnfc_param):
@ -30,10 +32,42 @@ class FailScript(object):
raise Exception(f'test {operation} error')
class CoordScript(object):
def __init__(self, vnfc_param):
self.vnfc_param = vnfc_param
def run(self):
if not os.path.exists('/tmp/change_vnfpkg_coordination'):
return
coord_req = self.vnfc_param['LcmCoordRequest']
coord_req['coordinationActionName'] = (
"prv.tacker_organization.coordination_test")
endpoint = self.vnfc_param.get('endpoint')
authentication = self.vnfc_param.get('authentication')
input_params = self.vnfc_param.get('inputParams')
if input_params is not None:
coord_req['inputParams'] = input_params
if endpoint is None:
raise Exception('endpoint must be specified.')
if authentication is None:
raise Exception('authentication must be specified.')
coord = coord_client.create_coordination(endpoint, authentication,
coord_req)
if coord['coordinationResult'] != "CONTINUE":
raise Exception(
f"coordinationResult is {coord['coordinationResult']}")
def main():
vnfc_param = pickle.load(sys.stdin.buffer)
script = FailScript(vnfc_param)
script.run()
script = CoordScript(vnfc_param)
script.run()
if __name__ == "__main__":

View File

@ -0,0 +1,176 @@
# Copyright (C) 2023 Nippon Telegraph and Telephone Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import requests
from unittest import mock
from tacker.sol_refactored.common import coord_client
from tacker.sol_refactored.common import exceptions as sol_ex
from tacker.sol_refactored.common import http_client
from tacker.tests import base
endpoint = 'http://127.0.0.1:6789'
coord_req_example = {
'vnfInstanceId': 'b18a8a15-8973-4202-a2f0-a67a109fc461',
'vnfLcmOpOccId': '2cae986e-7fea-4aeb-9b22-f81b35800838',
'lcmOperationType': 'CHANGE_VNFPKG',
'_links': {
'vnfLcmOpOcc': {'href': 'http://127.0.0.1:9890/vnflcm/v2/'
'vnf_lcm_op_occs/'
'2cae986e-7fea-4aeb-9b22-f81b35800838'},
'vnfInstance': {'href': 'http://127.0.0.1:9890/vnflcm/v2/'
'vnf_instances/'
'b18a8a15-8973-4202-a2f0-a67a109fc461'}
},
'coordinationActionName': 'prv.tacker_organization.coordination_test'
}
resp_body = {
'id': '2e11d0cb-8cb1-4418-926c-5e31f0a2538b',
'coordinationResult': 'CONTINUE',
'vnfInstanceId': 'b18a8a15-8973-4202-a2f0-a67a109fc461',
'vnfLcmOpOccId': '2cae986e-7fea-4aeb-9b22-f81b35800838',
'lcmOperationType': 'CHANGE_VNFPKG',
'coordinationActionName': 'prv.tacker_organization.coordination_test',
'_links': {
'vnfLcmOpOcc': {'href': 'http://127.0.0.1:9890/vnflcm/v2/'
'vnf_lcm_op_occs/'
'2cae986e-7fea-4aeb-9b22-f81b35800838'},
'vnfInstance': {'href': 'http://127.0.0.1:9890/vnflcm/v2/'
'vnf_instances/'
'b18a8a15-8973-4202-a2f0-a67a109fc461'}
}
}
class TestCoordClient(base.BaseTestCase):
@mock.patch.object(http_client.HttpClient, 'do_request')
def test_create_coordination_synchronous(self, mock_resp):
authentication = {
"authType": ["BASIC"],
"paramsBasic": {
"user": "user",
"password": "password"
}
}
resp = requests.Response()
resp.status_code = 201
mock_resp.return_value = (resp, resp_body)
body = coord_client.create_coordination(endpoint, authentication,
coord_req_example)
self.assertEqual(resp_body, body)
@mock.patch.object(http_client.HttpClient, 'do_request')
def test_create_coordination_synchronous_retry(self, mock_resp):
authentication = {
"authType": ["BASIC"],
"paramsBasic": {
"user": "user",
"password": "password"
}
}
resp_1 = requests.Response()
resp_1.status_code = 503
resp_1.headers['Retry-After'] = "1"
resp_2 = requests.Response()
resp_2.status_code = 201
mock_resp.side_effect = [(resp_1, None), (resp_2, resp_body)]
body = coord_client.create_coordination(endpoint, authentication,
coord_req_example)
self.assertEqual(2, mock_resp.call_count)
self.assertEqual(resp_body, body)
@mock.patch.object(http_client.HttpClient, 'do_request')
def test_create_coordination_asynchronous(self, mock_resp):
authentication = {
"authType": ["BASIC"],
"paramsBasic": {
"user": "user",
"password": "password"
}
}
resp_1 = requests.Response()
resp_1.status_code = 202
resp_1.headers['Location'] = ("http://127.0.0.1:6789/"
"lcmcoord/v1/coordinations/"
"b18a8a15-8973-4202-a2f0-a67a109fc461")
resp_2 = requests.Response()
resp_2.status_code = 202
resp_2.headers['Location'] = ("http://127.0.0.1:6789/"
"lcmcoord/v1/coordinations/"
"b18a8a15-8973-4202-a2f0-a67a109fc461")
resp_2.headers['Retry-After'] = "1"
resp_3 = requests.Response()
resp_3.status_code = 200
mock_resp.side_effect = [(resp_1, None), (resp_2, None),
(resp_3, resp_body)]
body = coord_client.create_coordination(endpoint, authentication,
coord_req_example)
self.assertEqual(3, mock_resp.call_count)
self.assertEqual(resp_body, body)
def test_create_coordination_invalid_authentication(self):
authentication = {
"authType": ["OAUTH2_CLIENT_CREDENTIALS"]
}
ex = self.assertRaises(sol_ex.InvalidSubscription,
coord_client.create_coordination, endpoint, authentication,
coord_req_example)
expected_message = "paramsOauth2ClientCredentials must be specified."
self.assertEqual(expected_message, ex.detail)
authentication = {
"authType": ["BASIC"]
}
ex = self.assertRaises(sol_ex.InvalidSubscription,
coord_client.create_coordination, endpoint, authentication,
coord_req_example)
expected_message = "paramsBasic must be specified."
self.assertEqual(expected_message, ex.detail)
@mock.patch.object(http_client.HttpClient, 'do_request')
def test_create_coordination_no_location_header(self, mock_resp):
authentication = {
"authType": ["BASIC"],
"paramsBasic": {
"user": "user",
"password": "password"
}
}
resp = requests.Response()
resp.status_code = 202
mock_resp.return_value = (resp, None)
ex = self.assertRaises(sol_ex.SolException,
coord_client.create_coordination, endpoint, authentication,
coord_req_example)
expected_message = "Location header not included in response."
self.assertEqual(expected_message, ex.detail)