[WIP] Support Coordination API in change_vnfpkg

This patch supports Coordination API when VNFc are updated by
change_vnfpkg v2 API.

Change-Id: Ie0c8b30df9d6e8c9ae4a6ba9e894203561475022
This commit is contained in:
Ken Fujimoto 2023-01-31 03:30:16 +00:00
parent cb069e5dbb
commit 402ff6f2f7
10 changed files with 526 additions and 13 deletions

View File

@ -0,0 +1,115 @@
# Copyright (C) 2023 Nippon Telegraph and Telephone Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo_log import log as logging
from tacker.sol_refactored.common import exceptions as sol_ex
from tacker.sol_refactored.common import http_client
LOG = logging.getLogger(__name__)
DEFAULT_INTERVAL = 5
class CoordinationApiClient(object):
def __init__(self, endpoint, auth_handle):
self.endpoint = endpoint
self.client = http_client.HttpClient(auth_handle)
def create_coordination(self, coord_request):
url = "{}/lcmcoord/v1/coordinations".format(self.endpoint)
resp, body = self.client.do_request(
url, "POST", expected_status=[201, 202, 503], version="1.0.0",
body=coord_request)
return resp, body
def get_coordination(self, coord_id):
url = "{}/lcmcoord/v1/coordinations/{}".format(self.endpoint, coord_id)
resp, body = self.client.do_request(
url, "GET", expected_status=[200, 202], version="1.0.0")
return resp, body
def create_coordination(endpoint, auth_handle, coord_request):
if 'BASIC' in auth_handle['authType']:
basic_req = auth_handle.get('paramsBasic')
if basic_req is None:
msg = "ParamsBasic must be specified."
raise sol_ex.InvalidSubscription(sol_detail=msg)
auth = http_client.BasicAuthHandle(basic_req.get('userName'),
basic_req.get('password'))
elif 'OAUTH2_CLIENT_CREDENTIALS' in auth_handle['authType']:
oauth2_req = auth_handle.get('paramsOauth2ClientCredentials')
if oauth2_req is None:
msg = "paramsOauth2ClientCredentials must be specified."
raise sol_ex.InvalidSubscription(sol_detail=msg)
auth = http_client.OAuth2AuthHandle(endpoint,
oauth2_req.get('tokenEndpoint'),
oauth2_req.get('clientId'),
oauth2_req.get('clientPassword'))
# Add mTLS later
else:
msg = "authType is incorrect or not specifiled."
raise sol_ex.InvalidSubscription(sol_detail=msg)
client = CoordinationApiClient(endpoint, auth)
def _get_retry_after(resp):
if resp.headers.get('Retry-After') is None:
LOG.warning("Retry-After header not included in response. "
"Use DEFAULT_INTERVAL.")
return DEFAULT_INTERVAL
try:
return int(resp.headers.get('Retry-After'))
except ValueError:
# may be HTTP-date format. it is not supported.
# use DEFAULT_INTERVAL
return DEFAULT_INTERVAL
while (1):
resp, body = client.create_coordination(coord_request)
if resp.status_code == 201:
# syncronous mode. done.
return body
elif resp.status_code == 202:
# asyncronous mode.
break
# else: 503
# get retry-after
time.sleep(_get_retry_after(resp))
# asyncronous mode.
location = resp.headers.get('Location')
if location is None:
msg = "Location header not included in response."
raise sol_ex.SolException(sol_detail=msg)
coord_id = location.split('/')[-1]
while (1):
# get retry-after if exist else some value
time.sleep(_get_retry_after(resp))
resp, body = client.get_coordination(coord_id)
if resp.status_code == 200:
# done.
return body
# else: 202

View File

@ -268,7 +268,8 @@ class SshIpNotFoundException(SolHttpError404):
class CoordinateVNFExecutionFailed(SolHttpError422):
message = _('CoordinateVNF execution failed.')
title = 'Coordinate VNF execution failed'
# detail set in the code
class VmRunningFailed(SolHttpError422):

View File

@ -28,6 +28,7 @@ from oslo_utils import uuidutils
from tacker.sol_refactored.common import config
from tacker.sol_refactored.common import exceptions as sol_ex
from tacker.sol_refactored.common import lcm_op_occ_utils as lcmocc_utils
from tacker.sol_refactored.common import vnf_instance_utils as inst_utils
from tacker.sol_refactored.infra_drivers.openstack import heat_utils
from tacker.sol_refactored.infra_drivers.openstack import nova_utils
@ -491,8 +492,8 @@ class Openstack(object):
heat_client.update_stack(stack_name, update_fields)
# execute coordinate_vnf_script
self._execute_coordinate_vnf_script(req, vnfd, vnfc, heat_client,
is_rollback)
self._execute_coordinate_vnf_script(
req, vnfd, vnfc, inst, grant_req, heat_client, is_rollback)
def _change_vnfpkg_rolling_update_user_data_standard(self, req, inst,
grant_req, grant, vnfd, fields, heat_client, vnfcs, is_rollback):
@ -565,8 +566,8 @@ class Openstack(object):
heat_client.update_stack(stack_name, update_fields)
# execute coordinate_vnf_script
self._execute_coordinate_vnf_script(req, vnfd, vnfc, heat_client,
is_rollback)
self._execute_coordinate_vnf_script(
req, vnfd, vnfc, inst, grant_req, heat_client, is_rollback)
def _get_ssh_ip(self, stack_id, cp_name, heat_client):
# NOTE: It is assumed that if the user want to use floating_ip,
@ -578,8 +579,8 @@ class Openstack(object):
elif cp_info.get('attributes', {}).get('fixed_ips'):
return cp_info['attributes']['fixed_ips'][0].get('ip_address')
def _execute_coordinate_vnf_script(self, req, vnfd, vnfc, heat_client,
is_rollback):
def _execute_coordinate_vnf_script(self, req, vnfd, vnfc, inst, grant_req,
heat_client, is_rollback):
if is_rollback:
script = req.additionalParams.get(
'lcm-operation-coordinate-old-vnf')
@ -605,6 +606,24 @@ class Openstack(object):
vnfc_param['ssh_ip'] = ssh_ip
vnfc_param['is_rollback'] = is_rollback
coord_req = objects.LcmCoordRequest(
vnfInstanceId=inst.id,
vnfLcmOpOccId=grant_req.vnfLcmOpOccId,
lcmOperationType=grant_req.operation,
coordinationActionaName=None,
_links=objects.LcmCoordRequest_Links(
vnfLcmOpOcc=objects.Link(
href=lcmocc_utils.lcmocc_href(grant_req.vnfLcmOpOccId,
CONF.v2_vnfm.endpoint)),
vnfInstance=objects.Link(
href=inst_utils.inst_href(inst.id,
CONF.v2_vnfm.endpoint))
)
)
vnfc_param['LcmCoordRequest'] = coord_req.to_dict()
vnfc_param['inst'] = inst.to_dict()
vnfc_param['vnfc_info_id'] = vnfc.id
tmp_csar_dir = vnfd.make_tmp_csar_dir()
script_path = os.path.join(tmp_csar_dir, script)
out = subprocess.run(["python3", script_path],
@ -612,8 +631,9 @@ class Openstack(object):
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
vnfd.remove_tmp_csar_dir(tmp_csar_dir)
if out.returncode != 0:
LOG.error(out)
raise sol_ex.CoordinateVNFExecutionFailed()
msg = f'{out.stderr}'
LOG.error(msg)
raise sol_ex.CoordinateVNFExecutionFailed(sol_detail=msg)
def change_vnfpkg_rollback(self, req, inst, grant_req, grant, vnfd,
lcmocc):

View File

@ -86,6 +86,7 @@ def register_all():
__import__(objects_root + '.v2.lccn_links')
__import__(objects_root + '.v2.lccn_subscription')
__import__(objects_root + '.v2.lccn_subscription_request')
__import__(objects_root + '.v2.lcm_coord_request')
__import__(objects_root + '.v2.lifecycle_change_notifications_filter')
__import__(objects_root + '.v2.modifications_triggered_by_vnf_pkg_change')
__import__(objects_root + '.v2.monitoring_parameter')

View File

@ -0,0 +1,51 @@
# Copyright (C) 2023 Nippon Telegraph and Telephone Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.sol_refactored.objects import base
from tacker.sol_refactored.objects import fields
from tacker.sol_refactored.objects.v2 import fields as v2fields
# NFV-SOL 002
# - v3.6.1 10.5.2.2 (API version: 1.0.0)
@base.TackerObjectRegistry.register
class LcmCoordRequest(base.TackerPersistentObject,
base.TackerObjectDictCompat):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'vnfInstanceId': fields.StringField(nullable=False),
'vnfLcmOpOccId': fields.StringField(nullable=False),
# NOTE: its type is LcmOperationForCoordType according to the
# specification. but it is same as LcmOperationType.
'lcmOperationType': v2fields.LcmOperationTypeField(nullable=False),
'coordinationActionName': fields.StringField(nullable=False),
'inputParams': fields.KeyValuePairsField(nullable=True),
'_links': fields.ObjectField('LcmCoordRequest_Links', nullable=False)
}
@base.TackerObjectRegistry.register
class LcmCoordRequest_Links(base.TackerObject):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'vnfLcmOpOcc': fields.ObjectField('Link', nullable=True),
'vnfInstance': fields.ObjectField('Link', nullable=True)
}

View File

@ -91,6 +91,44 @@ def handle_token(environ, start_response):
return [body.encode('utf-8')]
def handle_coordinations(environ, start_response):
method = environ['REQUEST_METHOD']
print("coordinations %s" % method)
if method not in ['POST']:
print(" not support method")
start_response('405 not suportted method',
[('Content-Type', 'application/problem+json')])
problem_detail = {'status': 405,
'detail': "not supported method"}
body = json.dumps(problem_detail)
return [body.encode('utf-8')]
authorization = environ.get("HTTP_AUTHORIZATION", "")
version = environ.get("HTTP_VERSION", "")
print(" authorizarion: %s" % authorization)
print(" version: %s" % version)
length = environ.get('CONTENT_LENGTH')
print(" content_length: %s" % length)
body = environ.get('wsgi.input').read(int(length))
body = json.loads(body.decode('utf-8'))
print(" request body: %s" % body)
start_response('201 Created', [('Content-Type', 'application/json')])
data = {
"id": "2e11d0cb-8cb1-4418-926c-5e31f0a2538b",
"coordinationResult": "CONTINUE",
"vnfInstanceId": body.get('vnfInstanceId'),
"vnfLcmOpOccId": body.get('vnfLcmOpOccId'),
"lcmOperationType": body.get('lcmOperationType'),
"coordinationActionName": body.get('coordinationActionName'),
"_links": body.get('_links')
}
body = json.dumps(data)
print(" response body: %s" % body)
return [body.encode('utf-8')]
def notif_endpoint_app(environ, start_response):
path = environ['PATH_INFO']
@ -100,6 +138,9 @@ def notif_endpoint_app(environ, start_response):
if path == "/token":
return handle_token(environ, start_response)
if path == "/lcmcoord/v1/coordinations":
return handle_coordinations(environ, start_response)
if __name__ == '__main__':
try:

View File

@ -16,10 +16,32 @@
import os
import time
from tacker.tests.functional.sol_v2_common import base_v2
from tacker.tests.functional.sol_v2_common import paramgen
from tacker.tests.functional.sol_v2_common import test_vnflcm_basic_common
def create_coordinate_response(req_header, req_body):
inst_id = req_body.get('vnfInstanceId')
lcmocc_id = req_body.get('vnfLcmOpOccId')
operation = req_body.get('lcmOperationType')
coordinationActionName = req_body.get('coordinationActionName')
links = req_body.get('_links')
resp_body = {
'id': 'aeca5328-085c-4cd6-a6f4-c010e9082528',
'coordinationResult': 'CONTINUE',
'vnfInstanceId': inst_id,
'vnfLcmOpOccId': lcmocc_id,
'lcmOperationType': operation,
'coordinationActionName': coordinationActionName,
'_links': links
}
return resp_body
class IndividualVnfcMgmtTest(test_vnflcm_basic_common.CommonVnfLcmTest):
@classmethod
@ -294,12 +316,32 @@ class IndividualVnfcMgmtTest(test_vnflcm_basic_common.CommonVnfLcmTest):
# 6. Change_vnfpkg operation
change_vnfpkg_req = paramgen.sample4_change_vnfpkg(self.vnfd_id_2,
net_ids, subnet_ids)
for vdu_param in change_vnfpkg_req['additionalParams']['vdu_params']:
vdu_param['new_vnfc_param']['endpoint'] = (
f'http://localhost:{base_v2.FAKE_SERVER_MANAGER.SERVER_PORT}')
# Prepare coordination
base_v2.FAKE_SERVER_MANAGER.set_callback(
'POST',
'/lcmcoord/v1/coordinations',
status_code=201,
response_headers={"Content-Type": "application/json"},
callback=create_coordinate_response
)
with open('/tmp/change_vnfpkg_coordination', 'w'):
pass
# execute Change_vnfpkg operation
resp, body = self.change_vnfpkg(inst_id, change_vnfpkg_req)
self.assertEqual(202, resp.status_code)
lcmocc_id = os.path.basename(resp.headers['Location'])
self.wait_lcmocc_complete(lcmocc_id)
os.remove('/tmp/change_vnfpkg_coordination')
# Show VNF instance
resp, inst_6 = self.show_vnf_instance(inst_id)
self.assertEqual(200, resp.status_code)

View File

@ -1158,12 +1158,28 @@ def sample4_change_vnfpkg(vnfd_id, net_ids, subnet_ids):
"old_vnfc_param": {
"cp_name": "VDU1_CP1",
"username": "ubuntu",
"password": "ubuntu"
"password": "ubuntu",
"endpoint": "http://127.0.0.1:6789",
"authentication": {
"authType": ["BASIC"],
"paramsBasic": {
"userName": "tacker",
"password": "tacker"
}
}
},
"new_vnfc_param": {
"cp_name": "VDU1_CP1",
"username": "ubuntu",
"password": "ubuntu"
"password": "ubuntu",
"endpoint": "http://127.0.0.1:6789",
"authentication": {
"authType": ["BASIC"],
"paramsBasic": {
"userName": "tacker",
"password": "tacker"
}
}
}
},
{
@ -1171,12 +1187,28 @@ def sample4_change_vnfpkg(vnfd_id, net_ids, subnet_ids):
"old_vnfc_param": {
"cp_name": "VDU2_CP1",
"username": "ubuntu",
"password": "ubuntu"
"password": "ubuntu",
"endpoint": "http://127.0.0.1:6789",
"authentication": {
"authType": ["BASIC"],
"paramsBasic": {
"userName": "tacker",
"password": "tacker"
}
}
},
"new_vnfc_param": {
"cp_name": "VDU2_CP1",
"username": "ubuntu",
"password": "ubuntu"
"password": "ubuntu",
"endpoint": "http://127.0.0.1:6789",
"authentication": {
"authType": ["BASIC"],
"paramsBasic": {
"userName": "tacker",
"password": "tacker"
}
}
}
}
],

View File

@ -17,6 +17,8 @@ import os
import pickle
import sys
from tacker.sol_refactored.common import coord_client
class FailScript(object):
def __init__(self, vnfc_param):
@ -30,10 +32,41 @@ class FailScript(object):
raise Exception(f'test {operation} error')
class CoordScript(object):
def __init__(self, vnfc_param):
self.vnfc_param = vnfc_param
def run(self):
if not os.path.exists('/tmp/change_vnfpkg_coordination'):
return
coord_req = self.vnfc_param['LcmCoordRequest']
coord_req['coordinationActionName'] = "prv.ntt.coordination_test"
endpoint = self.vnfc_param.get('endpoint')
authentication = self.vnfc_param.get('authentication')
input_params = self.vnfc_param.get('inputParams')
if input_params is not None:
coord_req['inputParams'] = input_params
if endpoint is None:
raise Exception('endpoint must be specified.')
if authentication is None:
raise Exception('authentication must be specified.')
coord = coord_client.create_coordination(endpoint, authentication,
coord_req)
if coord['coordinationResult'] != "CONTINUE":
raise Exception(
f"coordinationResult is {coord['coordinationResult']}")
def main():
vnfc_param = pickle.load(sys.stdin.buffer)
script = FailScript(vnfc_param)
script.run()
script = CoordScript(vnfc_param)
script.run()
if __name__ == "__main__":

View File

@ -0,0 +1,177 @@
# Copyright (C) 2023 Nippon Telegraph and Telephone Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import requests
from unittest import mock
from tacker.sol_refactored.common import coord_client
from tacker.sol_refactored.common import exceptions as sol_ex
from tacker.sol_refactored.common import http_client
from tacker.tests import base
endpoint = 'http://127.0.0.1:6789'
coord_req_example = {
'vnfInstanceId': 'b18a8a15-8973-4202-a2f0-a67a109fc461',
'vnfLcmOpOccId': '2cae986e-7fea-4aeb-9b22-f81b35800838',
'lcmOperationType': 'CHANGE_VNFPKG',
'_links': {
'vnfLcmOpOcc': {'href': 'http://127.0.0.1:9890/vnflcm/v2/'
'vnf_lcm_op_occs/'
'2cae986e-7fea-4aeb-9b22-f81b35800838'},
'vnfInstance': {'href': 'http://127.0.0.1:9890/vnflcm/v2/'
'vnf_instances/'
'b18a8a15-8973-4202-a2f0-a67a109fc461'}
},
'coordinationActionName': 'prv.ntt.coordination_test'
}
resp_body = {
'id': '2e11d0cb-8cb1-4418-926c-5e31f0a2538b',
'coordinationResult': 'CONTINUE',
'vnfInstanceId': 'b18a8a15-8973-4202-a2f0-a67a109fc461',
'vnfLcmOpOccId': '2cae986e-7fea-4aeb-9b22-f81b35800838',
'lcmOperationType': 'CHANGE_VNFPKG',
'coordinationActionName': 'prv.ntt.coordination_test',
'_links': {
'vnfLcmOpOcc': {'href': 'http://127.0.0.1:9890/vnflcm/v2/'
'vnf_lcm_op_occs/'
'2cae986e-7fea-4aeb-9b22-f81b35800838'},
'vnfInstance': {'href': 'http://127.0.0.1:9890/vnflcm/v2/'
'vnf_instances/'
'b18a8a15-8973-4202-a2f0-a67a109fc461'}
}
}
class TestCoordClient(base.BaseTestCase):
@mock.patch.object(http_client.HttpClient, 'do_request')
def test_create_coordination_syncronous(self, mock_resp):
authentication = {
"authType": ["BASIC"],
"paramsBasic": {
"user": "user",
"password": "password"
}
}
resp = requests.Response()
resp.status_code = 201
mock_resp.return_value = (resp, resp_body)
body = coord_client.create_coordination(endpoint, authentication,
coord_req_example)
self.assertEqual(resp_body, body)
@mock.patch.object(http_client.HttpClient, 'do_request')
def test_create_coordination_syncronous_retry(self, mock_resp):
authentication = {
"authType": ["BASIC"],
"paramsBasic": {
"user": "user",
"password": "password"
}
}
resp_1 = requests.Response()
resp_1.status_code = 503
resp_1.headers['Retry-After'] = "1"
resp_2 = requests.Response()
resp_2.status_code = 201
mock_resp.side_effect = [(resp_1, None), (resp_2, resp_body)]
body = coord_client.create_coordination(endpoint, authentication,
coord_req_example)
self.assertEqual(2, mock_resp.call_count)
self.assertEqual(resp_body, body)
@mock.patch.object(http_client.HttpClient, 'do_request')
def test_create_coordination_asyncronous(self, mock_resp):
authentication = {
"authType": ["BASIC"],
"paramsBasic": {
"user": "user",
"password": "password"
}
}
resp_1 = requests.Response()
resp_1.status_code = 202
resp_1.headers['Location'] = ("http://127.0.0.1:6789/"
"lcmcoord/v1/coordinations/"
"b18a8a15-8973-4202-a2f0-a67a109fc461")
resp_2 = requests.Response()
resp_2.status_code = 202
resp_2.headers['Location'] = ("http://127.0.0.1:6789/"
"lcmcoord/v1/coordinations/"
"b18a8a15-8973-4202-a2f0-a67a109fc461")
resp_2.headers['Retry-After'] = "1"
resp_3 = requests.Response()
resp_3.status_code = 200
mock_resp.side_effect = [(resp_1, None), (resp_2, None),
(resp_3, resp_body)]
body = coord_client.create_coordination(endpoint, authentication,
coord_req_example)
self.assertEqual(3, mock_resp.call_count)
self.assertEqual(resp_body, body)
def test_create_coordination_invalid_authentication(self):
authentication = {
"authType": ["BASIC"]
}
coord_req = {}
ex = self.assertRaises(sol_ex.InvalidSubscription,
coord_client.create_coordination, endpoint, authentication,
coord_req)
expected_message = "ParamsBasic must be specified."
self.assertEqual(expected_message, ex.detail)
authentication = {
"authType": ["OAUTH2_CLIENT_CREDENTIALS"]
}
ex = self.assertRaises(sol_ex.InvalidSubscription,
coord_client.create_coordination, endpoint, authentication,
coord_req_example)
expected_message = "paramsOauth2ClientCredentials must be specified."
self.assertEqual(expected_message, ex.detail)
@mock.patch.object(http_client.HttpClient, 'do_request')
def test_create_coordination_no_location_header(self, mock_resp):
authentication = {
"authType": ["BASIC"],
"paramsBasic": {
"user": "user",
"password": "password"
}
}
resp = requests.Response()
resp.status_code = 202
mock_resp.return_value = (resp, None)
ex = self.assertRaises(sol_ex.SolException,
coord_client.create_coordination, endpoint, authentication,
coord_req_example)
expected_message = "Location header not included in response."
self.assertEqual(expected_message, ex.detail)