Add support faultnotification auto heal

Support AutoHealing using FaultNotification interface between
Tacker and VIM.

Provide sample mgmt driver script to register/unregister
FaultNotification interface.

Implements: blueprint support-autoheal-queue
Change-Id: I9ec727c24ba053e928ccebb99dd49ebcaa64bcbb
changes/53/854353/27
Koji Shimizu 5 months ago
parent a5c7189051
commit 20afc2cca4

@ -273,6 +273,11 @@
Multinodes job for SOL V2 devstack-based functional tests
host-vars:
controller-tacker:
devstack_local_conf:
post-config:
$TACKER_CONF:
server_notification:
server_notification: true
tox_envlist: dsvm-functional-sol-v2
- job:

@ -11,6 +11,7 @@ use = egg:Paste#urlmap
/alert/vnf_instances: prometheus_auto_scaling
/alert: prometheus_fm
/pm_event: prometheus_pm
/server_notification: server_notification
[composite:tackerapi_v1_0]
use = call:tacker.auth:pipeline_factory
@ -97,3 +98,6 @@ paste.app_factory = tacker.sol_refactored.api.prometheus_plugin_router:FmAlertRo
[app:prometheus_pm]
paste.app_factory = tacker.sol_refactored.api.prometheus_plugin_router:PmEventRouter.factory
[app:server_notification]
paste.app_factory = tacker.sol_refactored.api.server_notification_router:ServerNotificationRouter.factory

@ -0,0 +1,9 @@
---
features:
- |
Support AutoHealing using FaultNotification interface between Tacker
and VIM.
Provide sample Mgmt driver script to register/unregister
FaultNotification interface.
At the same time, a user guide is also made to help users understand the
function.

@ -16,8 +16,14 @@
from oslo_policy import policy
from tacker.sol_refactored.common import config as cfg
CONF = cfg.CONF
POLICY_NAME = 'os_nfv_orchestration_api_v2:vnf_instances:{}'
SERVER_NOTIFICATION_POLICY_NAME = \
'tacker_server_notification_api:server_notification:{}'
RULE_ANY = '@'
V2_PATH = '/vnflcm/v2'
@ -28,6 +34,7 @@ SUBSCRIPTIONS_PATH = V2_PATH + '/subscriptions'
SUBSCRIPTIONS_ID_PATH = VNF_INSTANCES_PATH + '/{subscriptionId}'
VNF_LCM_OP_OCCS_PATH = V2_PATH + '/vnf_lcm_op_occs'
VNF_LCM_OP_OCCS_ID_PATH = VNF_LCM_OP_OCCS_PATH + '/{vnfLcmOpOccId}'
SERVER_NOTIFICATION_PATH = CONF.server_notification.uri_path_prefix
rules = [
policy.DocumentedRuleDefault(
@ -235,6 +242,18 @@ rules = [
),
]
sn_rules = [
policy.DocumentedRuleDefault(
name=SERVER_NOTIFICATION_POLICY_NAME.format('notify'),
check_str=RULE_ANY,
description="notify",
operations=[
{'method': 'POST',
'path': SERVER_NOTIFICATION_PATH}
]
),
]
def list_rules():
return rules
return rules + sn_rules

@ -0,0 +1,44 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.sol_refactored.api.schemas import common_types
ServerNotification = {
'type': 'object',
'properties': {
'notification': {
'type': 'object',
'properties': {
'host_id': common_types.Identifier,
'alarm_id': common_types.Identifier,
'fault_id': {
'type': 'string',
"minLength": 4,
"maxLength": 4
},
'fault_type': {
'type': 'string',
"minLength": 2,
"maxLength": 2
},
'fault_option': {'type': 'object'}
},
'required': ['alarm_id', 'fault_id', 'fault_type'],
'additionalProperties': True
}
},
'required': ['notification']
}

@ -0,0 +1,28 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.sol_refactored.api.policies import vnflcm_v2
from tacker.sol_refactored.api import server_notification_wsgi as sn_wsgi
from tacker.sol_refactored.controller import server_notification
class ServerNotificationRouter(sn_wsgi.ServerNotificationAPIRouter):
controller = sn_wsgi.ServerNotificationResource(
server_notification.ServerNotificationController(),
policy_name=vnflcm_v2.SERVER_NOTIFICATION_POLICY_NAME)
route_list = [
("/vnf_instances/{vnf_instance_id}/servers/{server_id}/notify",
{"POST": "notify"})
]

@ -0,0 +1,51 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
from tacker.api.validation import validators
from tacker.common import exceptions as tacker_ex
from tacker.sol_refactored.common import exceptions as sol_ex
# TODO(shimizu-koji): `validators._SchemaValidator` is protected class,
# thus it shouldn't be referred from other modules. This refactoring
# will be done in other patches in the future.
class ServerNotificationSchemaValidator(validators._SchemaValidator):
def validate(self, *args, **kwargs):
try:
super(ServerNotificationSchemaValidator, self).validate(
*args, **kwargs)
except tacker_ex.ValidationError as ex:
raise sol_ex.ServerNotificationValidationError(detail=str(ex))
def schema(request_body_schema):
def add_validator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
if 'body' not in kwargs:
raise sol_ex.ServerNotificationValidationError(
detail="body is missing.")
schema_validator = ServerNotificationSchemaValidator(
request_body_schema)
schema_validator.validate(kwargs['body'])
return func(*args, **kwargs)
return wrapper
return add_validator

@ -0,0 +1,82 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import webob
from oslo_log import log as logging
from tacker.sol_refactored.api import wsgi as sol_wsgi
LOG = logging.getLogger(__name__)
class ServerNotificationResponse(sol_wsgi.SolResponse):
allowed_headers = ['content_type']
def __init__(self, status, body, **kwargs):
self.status = status
self.body = body
self.headers = {}
for hdr in self.allowed_headers:
if hdr in kwargs:
self.headers[hdr] = kwargs[hdr]
class ServerNotificationErrorResponse(ServerNotificationResponse):
def __init__(self, ex, _):
status = ex.status if hasattr(ex, 'status') else 'error'
detail = ex.status if hasattr(ex, 'detail') else 'error'
problem_details = {'status': status, 'detail': detail}
if hasattr(ex, 'title'):
problem_details['title'] = ex.title
super(ServerNotificationErrorResponse, self).__init__(
problem_details['status'], problem_details)
class ServerNotificationResource(sol_wsgi.SolResource):
def __init__(self, controller, policy_name=None):
super(ServerNotificationResource, self).__init__(
controller, policy_name=policy_name
)
@webob.dec.wsgify(RequestClass=sol_wsgi.SolRequest)
def __call__(self, request):
LOG.info("%(method)s %(url)s", {"method": request.method,
"url": request.url})
try:
action, args, accept = self._deserialize_request(request)
self._check_policy(request, action)
result = self._dispatch(request, action, args)
response = result.serialize(accept)
except Exception as ex:
result = ServerNotificationErrorResponse(ex, request)
try:
response = result.serialize('application/problem+json')
except Exception:
LOG.exception("Unknown error")
return webob.exc.HTTPBadRequest(explanation="Unknown error")
LOG.info("%(url)s returned with HTTP %(status)d",
{"url": request.url, "status": response.status_int})
return response
class ServerNotificationAPIRouter(sol_wsgi.SolAPIRouter):
pass
class ServerNotificationAPIController(sol_wsgi.SolAPIController):
pass

@ -132,8 +132,28 @@ PROMETHEUS_PLUGIN_OPTS = [
CONF.register_opts(PROMETHEUS_PLUGIN_OPTS, 'prometheus_plugin')
SERVER_NOTIFICATION_OPTS = [
cfg.BoolOpt('server_notification',
default=False,
help=_('Enable server notification autohealing')),
cfg.StrOpt('uri_path_prefix',
default='/server_notification',
help=_('Uri path prefix string for server notification. '
'When changing this configuration, '
'server_notification description in api-paste.ini '
'must be changed to the same value.')),
cfg.IntOpt('timer_interval',
default=20,
help=_('Timeout (second) of packing for multiple '
'server notification.')),
]
CONF.register_opts(SERVER_NOTIFICATION_OPTS, 'server_notification')
def config_opts():
return [('v2_nfvo', NFVO_OPTS),
('v2_vnfm', VNFM_OPTS),
('prometheus_plugin', PROMETHEUS_PLUGIN_OPTS)]
('prometheus_plugin', PROMETHEUS_PLUGIN_OPTS),
('server_notification', SERVER_NOTIFICATION_OPTS)]

@ -427,3 +427,12 @@ class PrometheusPluginValidationError(SolValidationError):
class PrometheusSettingFailed(SolHttpError503):
message = _("Setting PM job on External Monitoring Tool failed.")
# server_notification
class ServerNotificationNotEnabled(SolHttpError404):
message = _("ServerNotification API is not enabled.")
class ServerNotificationValidationError(SolValidationError):
message = _("%(detail)s")

@ -94,7 +94,7 @@ class HttpClient(object):
def _decode_body(self, resp):
if resp.status_code == 204: # no content
return
content_type = resp.headers['Content-Type']
content_type = resp.headers.get('Content-Type', '')
if content_type == 'application/zip':
return resp.content
if content_type == 'text/plain':

@ -28,6 +28,9 @@ module_and_class = {
'auto_healing':
('tacker.sol_refactored.common.prometheus_plugin',
'PrometheusPluginAutoScaling'),
'server_notification':
('tacker.sol_refactored.common.server_notification',
'ServerNotification'),
}

@ -0,0 +1,106 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.sol_refactored.api.schemas import server_notification_schemas
from tacker.sol_refactored.api import server_notification_validator\
as validator
from tacker.sol_refactored.common import config as cfg
from tacker.sol_refactored.common import exceptions as sol_ex
from tacker.sol_refactored.common import monitoring_plugin_base as mon_base
from tacker.sol_refactored.common import vnf_instance_utils as inst_utils
from tacker.sol_refactored.conductor import conductor_rpc_v2
CONF = cfg.CONF
class ServerNotification(mon_base.MonitoringPlugin):
_instance = None
@staticmethod
def instance():
if not ServerNotification._instance:
if not CONF.server_notification.server_notification:
stub = mon_base.MonitoringPluginStub.instance()
ServerNotification._instance = stub
else:
ServerNotification()
return ServerNotification._instance
def __init__(self):
if ServerNotification._instance:
raise SystemError(
"Not constructor but instance() should be used.")
self.set_callback(self.default_callback)
self.rpc = conductor_rpc_v2.VnfLcmRpcApiV2()
ServerNotification._instance = self
def set_callback(self, notification_callback):
self._notification_callback = notification_callback
def alert(self, **kwargs):
self.notify(
kwargs['request'], kwargs['vnf_instance_id'],
body=kwargs['body'])
def default_callback(self, context, vnf_instance_id, vnfcids):
self.rpc.server_notification_notify(context, vnf_instance_id, vnfcids)
def get_vnfc_instance_id(
self, context, vnf_instance_id, alarm_id, fault_id):
vnf_instance = inst_utils.get_inst(context, vnf_instance_id)
if not vnf_instance:
raise sol_ex.ServerNotificationValidationError(
detail="target vnf instance not found.")
if (not vnf_instance.obj_attr_is_set('instantiatedVnfInfo') or
not vnf_instance.instantiatedVnfInfo.obj_attr_is_set(
'metadata') or
not vnf_instance.instantiatedVnfInfo.obj_attr_is_set(
'vnfcResourceInfo') or
not vnf_instance.instantiatedVnfInfo.obj_attr_is_set(
'vnfcInfo')):
raise sol_ex.ServerNotificationValidationError(
detail="access info not found in the vnf instance.")
if fault_id != vnf_instance.instantiatedVnfInfo.metadata.get(
'ServerNotifierFaultID'):
raise sol_ex.ServerNotificationValidationError(
detail="fault_id does not match.")
# Get the list of instantiatedVnfInfo.vnfcInfo[x].id where
# vnfcInfo[x].vnfcResourceInfoId == vnfcResourceInfo[y].id and
# vnfcResourceInfo[y].metadata.alarmId == alarm_id
rsc_info = filter(lambda x: ('metadata' in x and
alarm_id == x['metadata'].get('alarmId')),
vnf_instance.instantiatedVnfInfo.vnfcResourceInfo)
rsc_ids = list(map(lambda x: x['id'], rsc_info))
vnfc_info = filter(lambda x:
(x.obj_attr_is_set('vnfcResourceInfoId') and
x.vnfcResourceInfoId in rsc_ids),
vnf_instance.instantiatedVnfInfo.vnfcInfo)
vnfc_ids = list(map(lambda x: x.id, vnfc_info))
if len(vnfc_ids) == 0:
raise sol_ex.ServerNotificationValidationError(
detail="target vnfc not found.")
return vnfc_ids
@validator.schema(server_notification_schemas.ServerNotification)
def notify(self, request, vnf_instance_id, body):
context = request.context
vnfcids = self.get_vnfc_instance_id(
context, vnf_instance_id, body['notification']['alarm_id'],
body['notification']['fault_id'])
if self._notification_callback:
self._notification_callback(context, vnf_instance_id, vnfcids)

@ -51,6 +51,25 @@ class VnfLcmRpcApiV2(object):
def modify_vnfinfo(self, context, lcmocc_id):
self._cast_lcm_op(context, lcmocc_id, 'modify_vnfinfo')
def server_notification_cast(self, context, method, **kwargs):
serializer = objects_base.TackerObjectSerializer()
client = rpc.get_client(
self.target, version_cap=None, serializer=serializer)
cctxt = client.prepare()
cctxt.cast(context, method, **kwargs)
def server_notification_notify(
self, context, vnf_instance_id, vnfc_instance_ids):
self.server_notification_cast(
context, 'server_notification_notify',
vnf_instance_id=vnf_instance_id,
vnfc_instance_ids=vnfc_instance_ids)
def server_notification_remove_timer(self, context, vnf_instance_id):
self.server_notification_cast(
context, 'server_notification_remove_timer',
vnf_instance_id=vnf_instance_id)
TOPIC_PROMETHEUS_PLUGIN = 'TACKER_PROMETHEUS_PLUGIN'

@ -23,6 +23,7 @@ from tacker.sol_refactored.common import exceptions as sol_ex
from tacker.sol_refactored.common import lcm_op_occ_utils as lcmocc_utils
from tacker.sol_refactored.common import vnf_instance_utils as inst_utils
from tacker.sol_refactored.conductor import prometheus_plugin_driver as pp_drv
from tacker.sol_refactored.conductor import server_notification_driver as sdrv
from tacker.sol_refactored.conductor import vnffm_driver_v1
from tacker.sol_refactored.conductor import vnflcm_driver_v2
from tacker.sol_refactored.conductor import vnfpm_driver_v2
@ -45,7 +46,7 @@ class ConductorV2(object):
self.endpoint = CONF.v2_vnfm.endpoint
self.nfvo_client = nfvo_client.NfvoClient()
self.prom_driver = pp_drv.PrometheusPluginDriver.instance()
self.sn_driver = sdrv.ServerNotificationDriver.instance()
self._change_lcm_op_state()
def _change_lcm_op_state(self):
@ -330,3 +331,12 @@ class ConductorV2(object):
@log.log
def request_scale(self, context, id, scale_req):
self.prom_driver.request_scale(context, id, scale_req)
@log.log
def server_notification_notify(
self, context, vnf_instance_id, vnfc_instance_ids):
self.sn_driver.notify(vnf_instance_id, vnfc_instance_ids)
@log.log
def server_notification_remove_timer(self, context, vnf_instance_id):
self.sn_driver.remove_timer(vnf_instance_id)

@ -0,0 +1,127 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
from oslo_log import log as logging
from tacker.sol_refactored.common import config as cfg
from tacker.sol_refactored.common import http_client
from tacker.sol_refactored.conductor import conductor_rpc_v2 as rpc
from tacker.sol_refactored import objects
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class ServerNotificationTimer():
def __init__(self, vnf_instance_id, expiration_time, expiration_handler):
self.lock = threading.Lock()
self.expired = False
self.queue = []
self.vnf_instance_id = vnf_instance_id
self.expiration_handler = expiration_handler
self.timer = threading.Timer(expiration_time, self.expire)
self.timer.start()
def expire(self):
_expired = False
with self.lock:
if not self.expired:
self._cancel()
_expired = True
if _expired:
self.expiration_handler(
self.vnf_instance_id, list(set(self.queue)))
def append(self, vnfc_instance_ids):
with self.lock:
if not self.expired:
self.queue.extend(vnfc_instance_ids)
def _cancel(self):
self.timer.cancel()
self.expired = True
def cancel(self):
with self.lock:
if not self.expired:
self._cancel()
def __del__(self):
self.cancel()
class ServerNotificationDriver():
_instance = None
@staticmethod
def instance():
if not ServerNotificationDriver._instance:
ServerNotificationDriver._instance = (
ServerNotificationDriverMain()
if CONF.server_notification.server_notification
else ServerNotificationDriver())
return ServerNotificationDriver._instance
def notify(self, vnf_instance_id, vnfc_instance_ids):
pass
def remove_timer(self, vnf_instance_id):
pass
class ServerNotificationDriverMain(ServerNotificationDriver):
def __init__(self):
self.timer_map = {}
self.expiration_time = CONF.server_notification.timer_interval
auth_handle = http_client.KeystonePasswordAuthHandle(
auth_url=CONF.keystone_authtoken.auth_url,
username=CONF.keystone_authtoken.username,
password=CONF.keystone_authtoken.password,
project_name=CONF.keystone_authtoken.project_name,
user_domain_name=CONF.keystone_authtoken.user_domain_name,
project_domain_name=CONF.keystone_authtoken.project_domain_name)
self.client = http_client.HttpClient(auth_handle)
sn_auth_handle = http_client.NoAuthHandle()
self.sn_client = http_client.HttpClient(sn_auth_handle)
self.rpc = rpc.VnfLcmRpcApiV2()
def notify(self, vnf_instance_id, vnfc_instance_ids):
if vnf_instance_id not in self.timer_map:
self.timer_map[vnf_instance_id] = ServerNotificationTimer(
vnf_instance_id, self.expiration_time, self.timer_expired)
self.timer_map[vnf_instance_id].append(vnfc_instance_ids)
def remove_timer(self, vnf_instance_id):
if vnf_instance_id in self.timer_map:
self.timer_map[vnf_instance_id].cancel()
del self.timer_map[vnf_instance_id]
def request_heal(self, vnf_instance_id, vnfc_instance_ids):
heal_req = objects.HealVnfRequest(vnfcInstanceId=vnfc_instance_ids)
LOG.info("server_notification auto healing is processed: %s.",
vnf_instance_id)
ep = CONF.v2_vnfm.endpoint
url = f'{ep}/vnflcm/v2/vnf_instances/{vnf_instance_id}/heal'
resp, body = self.client.do_request(
url, "POST", body=heal_req.to_dict(), version="2.0.0")
if resp.status_code != 202:
LOG.error(str(body))
LOG.error("server_notification auto healing is failed: %d.",
resp.status_code)
def timer_expired(self, vnf_instance_id, vnfc_instance_ids):
self.remove_timer(vnf_instance_id)
self.request_heal(vnf_instance_id, vnfc_instance_ids)

@ -123,6 +123,17 @@ class VnfLcmDriverV2(object):
LOG.debug("execute %s failed: %s", operation, out.stderr)
msg = "{} failed: {}".format(operation, out.stderr)
raise sol_ex.MgmtDriverExecutionFailed(sol_detail=msg)
else:
try:
output = pickle.loads(out.stdout)
if isinstance(output, dict) and 'vnf_instance' in output:
_inst = objects.VnfInstanceV2.from_dict(
output['vnf_instance'])
inst.__dict__.update(_inst.__dict__)
except EOFError:
pass
except pickle.UnpicklingError:
pass
LOG.debug("execute %s of %s success.", operation, script)

@ -0,0 +1,32 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.sol_refactored.api import server_notification_wsgi as sn_wsgi
from tacker.sol_refactored.common import config as cfg
from tacker.sol_refactored.common import exceptions as sol_ex
from tacker.sol_refactored.common import monitoring_plugin_base as mon_base
CONF = cfg.CONF
class ServerNotificationController(sn_wsgi.ServerNotificationAPIController):
def notify(self, request, vnf_instance_id, server_id, body):
if not CONF.server_notification.server_notification:
raise sol_ex.ServerNotificationNotEnabled()
cls = mon_base.get_class('server_notification')
mon_base.MonitoringPlugin.get_instance(cls).alert(
request=request, vnf_instance_id=vnf_instance_id,
server_id=server_id, body=body)
return sn_wsgi.ServerNotificationResponse(204, None)

@ -1228,7 +1228,13 @@ class Openstack(object):
inst_vnf_info.vnfcInfo = vnfc_infos
inst_vnf_info.metadata = {}
# restore metadata
if (inst.obj_attr_is_set('instantiatedVnfInfo') and
inst.instantiatedVnfInfo.obj_attr_is_set('metadata')):
inst_vnf_info.metadata.update(inst.instantiatedVnfInfo.metadata)
# store stack_id into metadata
metadata = {"stack_id": stack_id}
inst_vnf_info.metadata = metadata
inst_vnf_info.metadata['stack_id'] = stack_id
inst.instantiatedVnfInfo = inst_vnf_info

@ -0,0 +1,249 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import pickle
import sys
from oslo_log import log as logging
from tacker.common import exceptions as common_ex
from tacker.sol_refactored.common import config as cfg
from tacker.sol_refactored.common import exceptions as sol_ex
from tacker.sol_refactored.common import http_client
from tacker.sol_refactored.conductor import conductor_rpc_v2
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class ServerNotificationMgmtDriver(object):
def __init__(self, req, inst, grant_req, grant, csar_dir):
self.req = req
self.inst = inst
self.grant_req = grant_req
self.grant = grant
self.csar_dir = csar_dir
auth_handle = http_client.NoAuthHandle()
self.client = http_client.HttpClient(auth_handle)
self.rpc = conductor_rpc_v2.VnfLcmRpcApiV2()
def make_output_dict(self):
return {'vnf_instance': self.inst}
def request_remove_timer(self, vnf_instance_id):
self.rpc.server_notification_remove_timer(
None, vnf_instance_id)
def _request_unregister(
self, server_notifier_uri, tenant, server_id, alarm_id):
try:
url = (f'{server_notifier_uri}/v2/{tenant}/servers/{server_id}/'
f'alarms/{alarm_id}')
resp, _ = self.client.do_request(
url, "DELETE")
if resp.status_code >= 400:
LOG.error(
"server_notification unregistration is failed: %d.",
resp.status_code)
else:
LOG.debug(
"server_notification unregistration is processed: %d.",
resp.status_code)
except sol_ex.SolException as e:
# Even if unregistration is failed for a single alarm_id,
# Unregistration should be done for remaining alarm_ids.
LOG.error(str(e))
def request_unregister(
self, isall=True, vnfc_res_ids=None):
server_notifier_uri, _, tenant = self.get_params()
found = False
res_ids = vnfc_res_ids if vnfc_res_ids else []
for rsc in self.inst['instantiatedVnfInfo']['vnfcResourceInfo']:
if ('metadata' in rsc and
'alarmId' in rsc['metadata'] and (isall or
rsc['computeResource']['resourceId'] in res_ids)):
found = True
alarm_id = rsc['metadata']['alarmId']
del rsc['metadata']['alarmId']
server_id = rsc['computeResource']['resourceId']
self._request_unregister(
server_notifier_uri, tenant, server_id, alarm_id)
return found
def request_register_cancel(self, rsc_list):
server_notifier_uri, _, tenant = self.get_params()
for rsc in rsc_list:
alarm_id = rsc['metadata']['alarmId']
del rsc['metadata']['alarmId']
server_id = rsc['computeResource']['resourceId']
self._request_unregister(
server_notifier_uri, tenant, server_id, alarm_id)
def _request_register(self, vnfc_resource):
server_id = vnfc_resource['computeResource']['resourceId']
server_notifier_uri, fault_id, tenant = self.get_params()
url = f'{server_notifier_uri}/v2/{tenant}/servers/{server_id}/alarms'
endpoint = CONF.v2_vnfm.endpoint
prefix = CONF.server_notification.uri_path_prefix
_id = self.inst['id']
fault_action = (f'{endpoint}/{prefix}/vnf_instances/{_id}/'
f'servers/{server_id}/notify')
req_body = {
'fault_action': fault_action,
'fault_id': fault_id
}
resp, res_body = self.client.do_request(
url, "POST", body=req_body)
if resp.status_code >= 400 or 'alarm_id' not in res_body:
msg = ("server_notification registration is "
f"failed: {resp.status_code}.")
raise common_ex.MgmtDriverOtherError(error_message=msg)
if 'metadata' not in vnfc_resource:
vnfc_resource['metadata'] = {}
vnfc_resource['metadata']['alarmId'] = res_body['alarm_id']
LOG.debug(
"server_notification registration is processed: %d. "
"alarm_id: %s", resp.status_code, res_body['alarm_id'])
return {'alarmId': res_body['alarm_id'], 'serverId': server_id}
def request_register(self):
rsc_list = []
for rsc in self.inst['instantiatedVnfInfo']['vnfcResourceInfo']:
if ('metadata' not in rsc or
'alarmId' not in rsc['metadata']):
try:
self._request_register(rsc)
rsc_list.append(rsc)
except sol_ex.SolException as e:
LOG.error(str(e))
self.request_register_cancel(rsc_list)
msg = "ServerNotification registration is failed."
raise common_ex.MgmtDriverOtherError(
error_message=msg) from e
def get_params(self):
server_notifier_uri = None
fault_id = None
tenant = None
additional_params = self.req.get('additionalParams', None)
if 'instantiatedVnfInfo' not in self.inst:
return (None, None, None)
vnf_info = self.inst['instantiatedVnfInfo']
if (additional_params and
'ServerNotifierUri' in additional_params and
'ServerNotifierFaultID' in additional_params):
server_notifier_uri = additional_params['ServerNotifierUri']
fault_id = additional_params['ServerNotifierFaultID']
elif (vnf_info and 'metadata' in vnf_info and
'ServerNotifierUri' in vnf_info['metadata'] and
'ServerNotifierFaultID' in vnf_info['metadata']):
server_notifier_uri = vnf_info['metadata']['ServerNotifierUri']
fault_id = vnf_info['metadata']['ServerNotifierFaultID']
if 'vimConnectionInfo' in self.inst:
for vim_info in self.inst['vimConnectionInfo'].values():
if ('accessInfo' in vim_info and
'project' in vim_info['accessInfo']):
tenant = vim_info['accessInfo']['project']
if server_notifier_uri and fault_id and tenant:
return (server_notifier_uri, fault_id, tenant)
return (None, None, None)
def terminate_start(self):
at_least_one_id_unregistered = self.request_unregister()
if at_least_one_id_unregistered:
self.request_remove_timer(self.inst['id'])
for key in ['ServerNotifierUri', 'ServerNotifierFaultID']:
if ('metadata' in self.inst['instantiatedVnfInfo'] and
key in self.inst['instantiatedVnfInfo']['metadata']):
del self.inst['instantiatedVnfInfo']['metadata'][key]
return self.make_output_dict()
def scale_start(self):
if self.req['type'] != 'SCALE_IN':
return
vnfc_res_ids = [res_def['resource']['resourceId']
for res_def in self.grant_req['removeResources']
if res_def.get('type', None) == 'COMPUTE']
self.request_unregister(
isall=False, vnfc_res_ids=vnfc_res_ids)
return self.make_output_dict()
def heal_start(self):
isall = ('additionalParams' in self.req and
self.req['additionalParams'].get('all', False) and
'vnfcInstanceId' not in self.req)
vnfc_res_ids = [res_def['resource']['resourceId']
for res_def in self.grant_req['removeResources']
if res_def.get('type', None) == 'COMPUTE']
self.request_unregister(
isall=isall, vnfc_res_ids=vnfc_res_ids)
return self.make_output_dict()
def instantiate_end(self):
self.request_register()
server_notifier_uri, fault_id, _ = self.get_params()
vnf_info = self.inst['instantiatedVnfInfo']
if 'metadata' not in vnf_info:
vnf_info['metadata'] = {}
vnf_info['metadata']['ServerNotifierUri'] = server_notifier_uri
vnf_info['metadata']['ServerNotifierFaultID'] = fault_id
return self.make_output_dict()
def scale_end(self):
if self.req['type'] != 'SCALE_OUT':
return
self.request_register()
return self.make_output_dict()
def heal_end(self):
self.request_register()
return self.make_output_dict()
def instantiate_start(self):
pass
def terminate_end(self):
pass
def main():
script_dict = pickle.load(sys.stdin.buffer)
operation = script_dict['operation']
req = script_dict['request']
inst = script_dict['vnf_instance']
grant_req = script_dict['grant_request']
grant = script_dict['grant_response']
csar_dir = script_dict['tmp_csar_dir']
script = ServerNotificationMgmtDriver(
req, inst, grant_req, grant, csar_dir)
output_dict = getattr(script, operation)()
sys.stdout.buffer.write(pickle.dumps(output_dict))
sys.stdout.flush()
if __name__ == "__main__":
try:
main()
os._exit(0)
except Exception as ex:
sys.stderr.write(str(ex))
sys.stderr.flush()
os._exit(1)

@ -141,7 +141,8 @@ def PrepareRequestHandler(manager):
return (status_code, mock_headers, mock_body)
def do_DELETE(self):
raise NotImplementedError
self.send_response(http.HTTPStatus.NO_CONTENT)
self.end_headers()
def do_GET(self):
"""Process GET request"""
@ -175,8 +176,14 @@ def PrepareRequestHandler(manager):
if self._is_match_with_list():
# Request is registered in our list.
tplUri = urlparse(self.path)
self._returned_callback(tplUri.path,
manager._funcs_posts[tplUri.path])
if self.path.startswith('/server_notification'):
for key in manager._funcs_posts.keys():
if self.path.startswith(key):
self._returned_callback(tplUri.path,
manager._funcs_posts[key])
else:
self._returned_callback(tplUri.path,
manager._funcs_posts[tplUri.path])
else:
# Unregistered URI is requested
LOG.debug('POST Recv. Unknown URL: "%s"' % self.path)

@ -0,0 +1,442 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import os
import time
from tacker.objects import fields
from tacker.tests.functional.sol_v2_common import base_v2
from tacker.tests.functional.sol_v2_common import paramgen
from tacker.tests.functional.sol_v2_common import test_vnflcm_basic_common
test_count = 0
def make_alarm_id(header, body):
from tacker.tests.functional.sol_v2 import test_server_notification
id = 'alarm_id_' + str(test_server_notification.test_count)
return {'alarm_id': id}
@ddt.ddt
class ServerNotificationTest(test_vnflcm_basic_common.CommonVnfLcmTest):
@classmethod
def setUpClass(cls):
super(ServerNotificationTest, cls).setUpClass()
cur_dir = os.path.dirname(__file__)
# tacker/tests/etc...
# /functional/sol_v2
image_dir = os.path.join(
cur_dir, "../../etc/samples/etsi/nfv/common/Files/images")
image_file = "cirros-0.5.2-x86_64-disk.img"
image_path = os.path.abspath(os.path.join(image_dir, image_file))
# for basic lcms tests max pattern
basic_lcms_max_path = os.path.join(cur_dir, "../sol_v2_common/"
"samples/basic_lcms_max")
cls.vnf_pkg_1, cls.vnfd_id_1 = cls.create_vnf_package(
basic_lcms_max_path, image_path=image_path)
# for basic lcms tests min pattern
basic_lcms_min_path = os.path.join(cur_dir, "../sol_v2_common/"
"samples/basic_lcms_min")
# no image contained
cls.vnf_pkg_2, cls.vnfd_id_2 = cls.create_vnf_package(
basic_lcms_min_path)
# for update vnf test
update_vnf_path = os.path.join(cur_dir, "../sol_v2_common/"
"samples/update_vnf")
# no image contained
cls.vnf_pkg_3, cls.vnfd_id_3 = cls.create_vnf_package(update_vnf_path)
# for server_notification test
server_notification_path = os.path.join(
cur_dir, "../sol_v2_common/samples/server_notification")
# no image contained
cls.svn_pkg, cls.svn_id = cls.create_vnf_package(
server_notification_path)
@classmethod
def tearDownClass(cls):
super(ServerNotificationTest, cls).tearDownClass()
cls.delete_vnf_package(cls.vnf_pkg_1)
cls.delete_vnf_package(cls.vnf_pkg_2)
cls.delete_vnf_package(cls.vnf_pkg_3)
cls.delete_vnf_package(cls.svn_pkg)
def setUp(self):
super().setUp()
def _check_package_usage(self, is_nfvo, package_id, state='NOT_IN_USE'):
if not is_nfvo:
usage_state = self.get_vnf_package(package_id)['usageState']
self.assertEqual(state, usage_state)
def fault_notification_queueing_test(self):
"""Test Fault Notification with queueing
* About Test operations:
This test includes the following operations.
- 1. LCM-Create
- 2. LCM-Instantiate
- 3. ServerNotifier-Notify (multiple times)
- 4. LCM-Heal
- 5. LCM-Scale (SCALE_OUT)
- 6. LCM-Scale (SCALE_IN)
- 7. LCM-Terminate
- 8. LCM-Delete
"""
self.fault_notification_basic_test(repeat=3)
def fault_notification_basic_test(self, repeat=1):
"""Test Fault Notification basic
* About Test operations:
This test includes the following operations.
- 1. LCM-Create
- 2. LCM-Instantiate
- 3. ServerNotifier-Notify
- 4. LCM-Heal
- 5. LCM-Scale (SCALE_OUT)
- 6. LCM-Scale (SCALE_IN)
- 7. LCM-Terminate
- 8. LCM-Delete
"""
is_nfvo = False
# 0. Pre setting
create_req = paramgen.create_vnf_min(self.svn_id)
# Create subscription
callback_url = os.path.join(base_v2.MOCK_NOTIFY_CALLBACK_URL,
self._testMethodName)
callback_uri = ('http://localhost:'
f'{base_v2.FAKE_SERVER_MANAGER.SERVER_PORT}'
f'{callback_url}')
server_notification_uri = ('http://localhost:'
f'{base_v2.FAKE_SERVER_MANAGER.SERVER_PORT}'
'/server_notification')
base_v2.FAKE_SERVER_MANAGER.set_callback(
'POST',
'/server_notification',
status_code=200,
response_headers={"Content-Type": "application/json"},
callback=make_alarm_id
)
sub_req = paramgen.sub_create_min(callback_uri)
resp, body = self.create_subscription(sub_req)
self.assertEqual(201, resp.status_code)
self.check_resp_headers_in_create(resp)
sub_id = body['id']
# Test notification
self.assert_notification_get(callback_url)
# check usageState of VNF Package
self._check_package_usage(is_nfvo, self.svn_pkg)
# 1. LCM-Create
# ETSI NFV SOL003 v3.3.1 5.5.2.2 VnfInstance
expected_inst_attrs = [
'id',
'vnfdId',
'vnfProvider',
'vnfProductName',
'vnfSoftwareVersion',
'vnfdVersion',
'instantiationState',
'_links'
]
resp, body = self.create_vnf_instance(create_req)
self.assertEqual(201, resp.status_code)
self.check_resp_headers_in_create(resp)
self.check_resp_body(body, expected_inst_attrs)
inst_id = body['id']
# check usageState of VNF Package
self._check_package_usage(is_nfvo, self.svn_pkg, 'IN_USE')
# check instantiationState of VNF
resp, body = self.show_vnf_instance(inst_id)
self.assertEqual(200, resp.status_code)
self.assertEqual(fields.VnfInstanceState.NOT_INSTANTIATED,
body['instantiationState'])
# 2. LCM-Instantiate
instantiate_req = paramgen.instantiate_vnf_min()
instantiate_req['additionalParams'] = {
'ServerNotifierUri': server_notification_uri,
'ServerNotifierFaultID': '1234'
}
resp, body = self.instantiate_vnf_instance(inst_id, instantiate_req)
self.assertEqual(202, resp.status_code)
self.check_resp_headers_in_operation_task(resp)
lcmocc_id = os.path.basename(resp.headers['Location'])
self.wait_lcmocc_complete(lcmocc_id)
# check creation of Heat-stack
stack_name = f'vnf-{inst_id}'
stack_status, _ = self.heat_client.get_status(stack_name)
self.assertEqual("CREATE_COMPLETE", stack_status)
# check that the servers set in "nfvi_node:Affinity" are
# deployed on the same host.
# NOTE: it's up to heat to decide which host to deploy to
vdu1_details = self.get_server_details('VDU1')
vdu2_details = self.get_server_details('VDU2')
vdu1_host = vdu1_details['hostId']
vdu2_host = vdu2_details['hostId']
self.assertEqual(vdu1_host, vdu2_host)
# Show VNF instance
additional_inst_attrs = [
'vimConnectionInfo',
'instantiatedVnfInfo'
]
expected_inst_attrs.extend(additional_inst_attrs)
resp, body = self.show_vnf_instance(inst_id)
self.assertEqual(200, resp.status_code)
self.check_resp_headers_in_get(resp)
self.check_resp_body(body, expected_inst_attrs)
# check instantiationState of VNF
self.assertEqual(fields.VnfInstanceState.INSTANTIATED,
body['instantiationState'])
# check vnfState of VNF
self.assertEqual(fields.VnfOperationalStateType.STARTED,
body['instantiatedVnfInfo']['vnfState'])
# 3. ServerNotifier-Notify
for i in range(repeat):
alarm_id = 'alarm_id_0'
fault_notification_param = paramgen.server_notification(alarm_id)
resp, body = self.server_notification(
inst_id, 'server_id', fault_notification_param)
self.assertTrue(resp.status_code == 204 or resp.status_code == 404)
time.sleep(1)
# waiting for auto healing process complete after packing timer.
time.sleep(60)
# 4. LCM-Heal
nested_stacks = self.heat_client.get_resources(stack_name)
temp_stacks = [stack for stack in nested_stacks if
(stack['resource_name'] in ['VDU1', 'VDU2'])]
vdu1_stack_before_heal = [stack for stack in temp_stacks if
(stack['resource_name'] == 'VDU1')][0]
vdu2_stack_before_heal = [stack for stack in temp_stacks if
(stack['resource_name'] == 'VDU2')][0]
heal_req = paramgen.heal_vnf_all_min()
resp, body = self.heal_vnf_instance(inst_id, heal_req)
self.assertEqual(202, resp.status_code)
self.check_resp_headers_in_operation_task(resp)
lcmocc_id = os.path.basename(resp.headers['Location'])
self.wait_lcmocc_complete(lcmocc_id)
# check stack info
stack_status, _ = self.heat_client.get_status(stack_name)
self.assertEqual("UPDATE_COMPLETE", stack_status)
nested_stacks = self.heat_client.get_resources(stack_name)
temp_stacks = [stack for stack in nested_stacks if
(stack['resource_name'] in ['VDU1', 'VDU2'])]
vdu1_stack_after_heal = [stack for stack in temp_stacks if
(stack['resource_name'] == 'VDU1')][0]
vdu2_stack_after_heal = [stack for stack in temp_stacks if
(stack['resource_name'] == 'VDU2')][0]
self.assertEqual("CREATE_COMPLETE",
vdu1_stack_after_heal['resource_status'])
self.assertEqual("CREATE_COMPLETE",
vdu2_stack_after_heal['resource_status'])
self.assertNotEqual(vdu1_stack_before_heal['physical_resource_id'],
vdu1_stack_after_heal['physical_resource_id'])
self.assertNotEqual(vdu2_stack_before_heal['physical_resource_id'],
vdu2_stack_after_heal['physical_resource_id'])
# Show VNF instance
additional_inst_attrs = [
'vimConnectionInfo',
'instantiatedVnfInfo'
]
expected_inst_attrs.extend(additional_inst_attrs)
resp, body = self.show_vnf_instance(inst_id)
self.assertEqual(200, resp.status_code)
self.check_resp_headers_in_get(resp)
self.check_resp_body(body, expected_inst_attrs)
# check instantiationState of VNF
self.assertEqual(fields.VnfInstanceState.INSTANTIATED,
body['instantiationState'])
# check vnfState of VNF
self.assertEqual(fields.VnfOperationalStateType.STARTED,
body['instantiatedVnfInfo']['vnfState'])
# check usageState of VNF Package 2
self._check_package_usage(is_nfvo, self.svn_pkg, 'IN_USE')
self.assertEqual(self.svn_id, body['vnfdId'])
# Heal VNF(vnfc)
nested_stacks = self.heat_client.get_resources(stack_name)
temp_stacks = [stack for stack in nested_stacks if
(stack['resource_name'] == 'VDU2')]
vdu2_stack_before_heal = temp_stacks[0]
resp, body = self.show_vnf_instance(inst_id)
self.assertEqual(200, resp.status_code)
vnfc_info = body['instantiatedVnfInfo']['vnfcInfo']
self.assertGreater(len(vnfc_info), 1)
vnfc_id = [vnfc['id'] for vnfc in vnfc_info if (
"VDU2" == vnfc['vduId'])][0]
self.assertIsNotNone(vnfc_id)
heal_req = paramgen.heal_vnf_vnfc_min(vnfc_id)
resp, body = self.heal_vnf_instance(inst_id, heal_req)
self.assertEqual(202, resp.status_code)
self.check_resp_headers_in_operation_task(resp)
lcmocc_id = os.path.basename(resp.headers['Location'])
self.wait_lcmocc_complete(lcmocc_id)
# check stack info
stack_status, _ = self.heat_client.get_status(stack_name)
self.assertEqual("UPDATE_COMPLETE", stack_status)
nested_stacks = self.heat_client.get_resources(stack_name)
temp_stacks = [stack for stack in nested_stacks if
(stack['resource_name'] == 'VDU2')]
vdu2_stack_after_heal = temp_stacks[0]
self.assertEqual("CREATE_COMPLETE",
vdu2_stack_after_heal['resource_status'])
self.assertNotEqual(vdu2_stack_before_heal['physical_resource_id'],
vdu2_stack_after_heal['physical_resource_id'])
# Show VNF instance
additional_inst_attrs = [
'vimConnectionInfo',
'instantiatedVnfInfo'
]
expected_inst_attrs.extend(additional_inst_attrs)
resp, body = self.show_vnf_instance(inst_id)
self.assertEqual(200, resp.status_code)
self.check_resp_headers_in_get(resp)
self.check_resp_body(body, expected_inst_attrs)
# check vnfState of VNF
self.assertEqual(fields.VnfOperationalStateType.STARTED,
body['instantiatedVnfInfo']['vnfState'])
# 5. LCM-Scale (SCALE_OUT)
# get nested stack count before scaleout
nested_stacks = self.heat_client.get_resources(stack_name)
count_before_scaleout = len(nested_stacks)
scaleout_req = paramgen.scaleout_vnf_min()
resp, body = self.scale_vnf_instance(inst_id, scaleout_req)
self.assertEqual(202, resp.status_code)
self.check_resp_headers_in_operation_task(resp)
lcmocc_id = os.path.basename(resp.headers['Location'])
self.wait_lcmocc_complete(lcmocc_id)
# Show VNF instance
additional_inst_attrs = [
'vimConnectionInfo',
'instantiatedVnfInfo'
]
expected_inst_attrs.extend(additional_inst_attrs)
resp, body = self.show_vnf_instance(inst_id)
self.assertEqual(200, resp.status_code)
self.check_resp_headers_in_get(resp)
self.check_resp_body(body, expected_inst_attrs)
# check vnfState of VNF
self.assertEqual(fields.VnfOperationalStateType.STARTED,
body['instantiatedVnfInfo']['vnfState'])
# get nested stack count after scale out
nested_stacks = self.heat_client.get_resources(stack_name)
count_after_scaleout = len(nested_stacks)
# check nested stack was created
# 3 was the sum of 1 VM, 1 CP, 1 stack(VDU1.yaml)
self.assertEqual(3, count_after_scaleout - count_before_scaleout)