[WIP] Add support VNFM auto heal and scale

This patch provides some implementations for supporting receiving
alerts from External Monitoring Tools and VNFM(tacker)-driven
AutoHeal and AutoScale without NFVO.

Implements: blueprint support-auto-lcm
Change-Id: Ib0b5fd9264d80b9666ce69190e0ee41bbde23fac
This commit is contained in:
Yi Feng 2023-01-20 12:59:36 +09:00
parent 01af921686
commit 3a979eb625
15 changed files with 689 additions and 181 deletions

View File

@ -8,7 +8,8 @@ use = egg:Paste#urlmap
/vnflcm/v2: vnflcm_v2
/vnffm/v1: vnffm_v1
/vnfpm/v2: vnfpm_v2
/alert/vnf_instances: prometheus_auto_scaling
/alert/auto_scaling: prometheus_auto_scaling
/alert/auto_healing: prometheus_auto_healing
/alert: prometheus_fm
/pm_event: prometheus_pm
/server_notification: server_notification
@ -93,6 +94,9 @@ paste.app_factory = tacker.sol_refactored.api.router:VnffmAPIRouterV1.factory
[app:prometheus_auto_scaling]
paste.app_factory = tacker.sol_refactored.api.prometheus_plugin_router:AutoScalingRouter.factory
[app:prometheus_auto_healing]
paste.app_factory = tacker.sol_refactored.api.prometheus_plugin_router:AutoHealingRouter.factory
[app:prometheus_fm]
paste.app_factory = tacker.sol_refactored.api.prometheus_plugin_router:FmAlertRouter.factory

View File

@ -27,7 +27,8 @@ REPORT_GET = '/vnfpm/v2/pm_jobs/{id}/reports/{report_id}'
POLICY_NAME_PROM_PLUGIN = 'tacker_PROM_PLUGIN_api:PROM_PLUGIN:{}'
PROM_PLUGIN_PM_PATH = '/pm_event'
PROM_PLUGIN_AUTO_SCALING_PATH = '/alert/vnf_instances'
PROM_PLUGIN_AUTO_HEALING_PATH = '/alert/auto_healing'
PROM_PLUGIN_AUTO_SCALING_PATH = '/alert/auto_scaling'
rules = [
policy.DocumentedRuleDefault(
@ -107,6 +108,15 @@ rules = [
'path': PROM_PLUGIN_PM_PATH}
]
),
policy.DocumentedRuleDefault(
name=POLICY_NAME_PROM_PLUGIN.format('auto_healing'),
check_str=RULE_ANY,
description="auto_healing",
operations=[
{'method': 'POST',
'path': PROM_PLUGIN_AUTO_HEALING_PATH}
]
),
policy.DocumentedRuleDefault(
name=POLICY_NAME_PROM_PLUGIN.format('auto_scaling'),
check_str=RULE_ANY,
@ -115,15 +125,6 @@ rules = [
{'method': 'POST',
'path': PROM_PLUGIN_AUTO_SCALING_PATH}
]
),
policy.DocumentedRuleDefault(
name=POLICY_NAME_PROM_PLUGIN.format('auto_scaling_id'),
check_str=RULE_ANY,
description="auto_scaling_id",
operations=[
{'method': 'POST',
'path': PROM_PLUGIN_AUTO_SCALING_PATH + '/{vnfInstanceId}'}
]
)
]

View File

@ -33,11 +33,15 @@ class FmAlertRouter(prom_wsgi.PrometheusPluginAPIRouter):
route_list = [("", {"POST": "alert"})]
class AutoHealingRouter(prom_wsgi.PrometheusPluginAPIRouter):
controller = prom_wsgi.PrometheusPluginResource(
prometheus_plugin_controller.AutoHealingController(),
policy_name=vnfpm_policy_v2.POLICY_NAME_PROM_PLUGIN)
route_list = [("", {"POST": "auto_healing"})]
class AutoScalingRouter(prom_wsgi.PrometheusPluginAPIRouter):
controller = prom_wsgi.PrometheusPluginResource(
prometheus_plugin_controller.AutoScalingController(),
policy_name=vnfpm_policy_v2.POLICY_NAME_PROM_PLUGIN)
route_list = [
("", {"POST": "auto_scaling"}),
("/{id}", {"POST": "auto_scaling"})
]
route_list = [("", {"POST": "auto_scaling"})]

View File

@ -34,11 +34,12 @@ Alert = {
},
'function_type': {
'type': 'string',
'enum': ['vnffm', 'vnfpm', 'auto_scale']
'enum': ['vnffm', 'vnfpm', 'auto_scale', 'auto_heal']
},
'job_id': {'type': 'string'},
'object_instance_id': {'type': 'string'},
'vnf_instance_id': {'type': 'string'},
'vnfc_info_id': {'type': 'string'},
'node': {'type': 'string'},
'perceived_severity': {
'type': 'string',

View File

@ -122,6 +122,9 @@ PROMETHEUS_PLUGIN_OPTS = [
cfg.BoolOpt('fault_management',
default=False,
help=_('Enable prometheus plugin fault management')),
cfg.BoolOpt('auto_healing',
default=False,
help=_('Enable prometheus plugin autohealing')),
cfg.BoolOpt('auto_scaling',
default=False,
help=_('Enable prometheus plugin autoscaling')),
@ -149,6 +152,22 @@ PROMETHEUS_PLUGIN_OPTS = [
'This configuration is changed in case of replacing '
'the original function with a vendor specific '
'function.')),
cfg.StrOpt('auto_healing_package',
default='tacker.sol_refactored.common.prometheus_plugin',
help=_('Package name for auto healing. '
'This configuration is changed in case of replacing '
'the original function with a vendor specific '
'function.')),
cfg.StrOpt('auto_healing_class',
default='PrometheusPluginAutoHealing',
help=_('Class name for auto healing. '
'This configuration is changed in case of replacing '
'the original function with a vendor specific '
'function.')),
cfg.IntOpt('timer_interval',
default=20,
help=_('Timeout (second) of packing for multiple '
'auto healing.')),
cfg.StrOpt('auto_scaling_package',
default='tacker.sol_refactored.common.prometheus_plugin',
help=_('Package name for auto scaling. '

View File

@ -790,6 +790,77 @@ class PrometheusPluginFm(PrometheusPlugin, mon_base.MonitoringPlugin):
return result
class PrometheusPluginAutoHealing(PrometheusPlugin, mon_base.MonitoringPlugin):
_instance = None
@staticmethod
def instance():
if PrometheusPluginAutoHealing._instance is None:
if not CONF.prometheus_plugin.auto_healing:
stub = mon_base.MonitoringPluginStub.instance()
PrometheusPluginAutoHealing._instance = stub
else:
PrometheusPluginAutoHealing()
return PrometheusPluginAutoHealing._instance
def __init__(self):
if PrometheusPluginAutoHealing._instance:
raise SystemError(
"Not constructor but instance() should be used.")
super(PrometheusPluginAutoHealing, self).__init__()
self.set_callback(self.default_callback)
PrometheusPluginAutoHealing._instance = self
def set_callback(self, notification_callback):
self.notification_callback = notification_callback
def alert(self, **kwargs):
try:
self._alert(kwargs['request'], body=kwargs['body'])
except Exception as e:
# All exceptions is ignored here and 204 response will always
# be returned. Because when tacker responds error to alertmanager,
# alertmanager may repeat the same reports.
LOG.error("%s: %s", e.__class__.__name__, e.args[0])
def default_callback(self, context, vnf_instance_id, vnfc_info_id):
self.rpc.vnfm_auto_heal_queue(context, vnf_instance_id, vnfc_info_id)
@validator.schema(prometheus_plugin_schemas.AlertMessage)
def _alert(self, request, body):
context = request.context
for alt in body['alerts']:
if alt['status'] != 'firing':
continue
if alt['labels']['receiver_type'] != 'tacker':
continue
if alt['labels']['function_type'] != 'auto_heal':
continue
vnf_instance_id = alt['labels']['vnf_instance_id']
inst = inst_utils.get_inst(context, vnf_instance_id)
if inst.instantiationState != 'INSTANTIATED':
self.rpc.vnfm_auto_heal_remove_timer(
None, vnf_instance_id)
if (not inst.obj_attr_is_set('vnfConfigurableProperties')
or not inst.vnfConfigurableProperties.get(
'isAutohealEnabled')):
continue
vnfc_info_id = alt['labels']['vnfc_info_id']
result = {
vnfcInfo for vnfcInfo in inst.instantiatedVnfInfo.vnfcInfo
if vnfcInfo.id == vnfc_info_id
}
if not result:
continue
if self.notification_callback:
self.notification_callback(
context, vnf_instance_id, vnfc_info_id)
class PrometheusPluginAutoScaling(PrometheusPlugin, mon_base.MonitoringPlugin):
_instance = None
@ -808,7 +879,7 @@ class PrometheusPluginAutoScaling(PrometheusPlugin, mon_base.MonitoringPlugin):
raise SystemError(
"Not constructor but instance() should be used.")
super(PrometheusPluginAutoScaling, self).__init__()
self.notification_callback = self.default_callback
self.set_callback(self.default_callback)
PrometheusPluginAutoScaling._instance = self
def set_callback(self, notification_callback):
@ -824,41 +895,48 @@ class PrometheusPluginAutoScaling(PrometheusPlugin, mon_base.MonitoringPlugin):
LOG.error("%s: %s", e.__class__.__name__, e.args[0])
def default_callback(self, context, vnf_instance_id, scaling_param):
self.rpc.request_scale(context, vnf_instance_id, scaling_param)
self.rpc.trigger_scale(context, vnf_instance_id, scaling_param)
def skip_if_auto_scale_not_enabled(self, vnf_instance):
if (not vnf_instance.obj_attr_is_set('vnfConfigurableProperties') or
not vnf_instance.vnfConfigurableProperties.get(
'isAutoscaleEnabled')):
raise sol_ex.PrometheusPluginSkipped()
def process_auto_scale(self, request, vnf_instance_id, auto_scale_type,
def process_auto_scale(self, context, vnf_instance_id, auto_scale_type,
aspect_id):
scaling_param = {
'type': auto_scale_type,
'aspectId': aspect_id,
}
context = request.context
if self.notification_callback:
self.notification_callback(context, vnf_instance_id, scaling_param)
self.notification_callback(
context, vnf_instance_id, scaling_param)
@validator.schema(prometheus_plugin_schemas.AlertMessage)
def _alert(self, request, body):
result = []
context = request.context
for alt in body['alerts']:
if alt['status'] != 'firing':
continue
if alt['labels']['receiver_type'] != 'tacker':
continue
if alt['labels']['function_type'] != 'auto_scale':
continue
try:
vnf_instance_id = alt['labels']['vnf_instance_id']
auto_scale_type = alt['labels']['auto_scale_type']
aspect_id = alt['labels']['aspect_id']
context = request.context
inst = inst_utils.get_inst(context, vnf_instance_id)
self.skip_if_auto_scale_not_enabled(inst)
self.process_auto_scale(
request, vnf_instance_id, auto_scale_type, aspect_id)
result.append((vnf_instance_id, auto_scale_type, aspect_id))
except sol_ex.PrometheusPluginSkipped:
pass
return result
vnf_instance_id = alt['labels']['vnf_instance_id']
inst = inst_utils.get_inst(context, vnf_instance_id)
if inst.instantiationState != 'INSTANTIATED':
continue
if (not inst.obj_attr_is_set('vnfConfigurableProperties') or
not inst.vnfConfigurableProperties.get(
'isAutoscaleEnabled')):
continue
aspect_id = alt['labels']['aspect_id']
result = {
scaleStatus for scaleStatus in
inst.instantiatedVnfInfo.scaleStatus
if scaleStatus.aspectId == aspect_id
}
if not result:
continue
auto_scale_type = alt['labels']['auto_scale_type']
self.process_auto_scale(
context, vnf_instance_id, auto_scale_type, aspect_id)

View File

@ -0,0 +1,130 @@
# Copyright (C) 2023 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
from oslo_utils import uuidutils
from tacker.sol_refactored.common import coordinate
from tacker.sol_refactored.common import exceptions as sol_ex
from tacker.sol_refactored.common import lcm_op_occ_utils as lcmocc_utils
from tacker.sol_refactored.common import vnf_instance_utils as inst_utils
from tacker.sol_refactored.conductor import conductor_rpc_v2
from tacker.sol_refactored import objects
from tacker.sol_refactored.objects.v2 import fields as v2fields
@coordinate.lock_vnf_instance('{vnf_instance_id}')
def heal(context, vnf_instance_id, body):
inst = inst_utils.get_inst(context, vnf_instance_id)
if inst.instantiationState != 'INSTANTIATED':
raise sol_ex.VnfInstanceIsNotInstantiated(inst_id=vnf_instance_id)
lcmocc_utils.check_lcmocc_in_progress(context, vnf_instance_id)
# check parameter for later use
is_all = body.get('additionalParams', {}).get('all', False)
if not isinstance(is_all, bool):
raise sol_ex.SolValidationError(
detail="additionalParams['all'] must be bool.")
if 'vnfcInstanceId' in body:
inst_info = inst.instantiatedVnfInfo
vnfc_id = []
if inst_info.obj_attr_is_set('vnfcInfo'):
vnfc_id = [vnfc.id for vnfc in inst_info.vnfcInfo]
for req_vnfc_id in body['vnfcInstanceId']:
if req_vnfc_id not in vnfc_id:
raise sol_ex.SolValidationError(
detail="vnfcInstanceId(%s) does not exist."
% req_vnfc_id)
lcmocc = new_lcmocc(vnf_instance_id, v2fields.LcmOperationType.HEAL, body)
lcmocc.create(context)
rpc = conductor_rpc_v2.VnfLcmRpcApiV2()
rpc.start_lcm_op(context, lcmocc.id)
return lcmocc
@coordinate.lock_vnf_instance('{vnf_instance_id}')
def scale(context, vnf_instance_id, body):
inst = inst_utils.get_inst(context, vnf_instance_id)
if inst.instantiationState != 'INSTANTIATED':
raise sol_ex.VnfInstanceIsNotInstantiated(inst_id=vnf_instance_id)
lcmocc_utils.check_lcmocc_in_progress(context, vnf_instance_id)
# check parameters
aspect_id = body['aspectId']
if 'numberOfSteps' not in body:
# set default value (1) defined by SOL specification for
# the convenience of the following methods.
body['numberOfSteps'] = 1
scale_level = _get_current_scale_level(inst, aspect_id)
max_scale_level = _get_max_scale_level(inst, aspect_id)
if scale_level is None or max_scale_level is None:
raise sol_ex.InvalidScaleAspectId(aspect_id=aspect_id)
num_steps = body['numberOfSteps']
if body['type'] == 'SCALE_IN':
num_steps *= -1
scale_level += num_steps
if scale_level < 0 or scale_level > max_scale_level:
raise sol_ex.InvalidScaleNumberOfSteps(
num_steps=body['numberOfSteps'])
lcmocc = new_lcmocc(vnf_instance_id, v2fields.LcmOperationType.SCALE, body)
lcmocc.create(context)
rpc = conductor_rpc_v2.VnfLcmRpcApiV2()
rpc.start_lcm_op(context, lcmocc.id)
return lcmocc
def _get_current_scale_level(inst, aspect_id):
if (inst.obj_attr_is_set('instantiatedVnfInfo') and
inst.instantiatedVnfInfo.obj_attr_is_set('scaleStatus')):
for scale_info in inst.instantiatedVnfInfo.scaleStatus:
if scale_info.aspectId == aspect_id:
return scale_info.scaleLevel
def _get_max_scale_level(inst, aspect_id):
if (inst.obj_attr_is_set('instantiatedVnfInfo') and
inst.instantiatedVnfInfo.obj_attr_is_set('maxScaleLevels')):
for scale_info in inst.instantiatedVnfInfo.maxScaleLevels:
if scale_info.aspectId == aspect_id:
return scale_info.scaleLevel
def new_lcmocc(inst_id, operation, req_body,
op_state=v2fields.LcmOperationStateType.STARTING):
now = datetime.utcnow()
lcmocc = objects.VnfLcmOpOccV2(
id=uuidutils.generate_uuid(),
operationState=op_state,
stateEnteredTime=now,
startTime=now,
vnfInstanceId=inst_id,
operation=operation,
isAutomaticInvocation=False,
isCancelPending=False,
operationParams=req_body)
return lcmocc

View File

@ -95,5 +95,15 @@ class PrometheusPluginConductor(object):
def store_job_info(self, context, report):
self.cast(context, 'store_job_info', report=report)
def request_scale(self, context, id, scale_req):
self.cast(context, 'request_scale', id=id, scale_req=scale_req)
def trigger_scale(self, context, id, scale_req):
self.cast(context, 'trigger_scale', id=id, scale_req=scale_req)
def vnfm_auto_heal_queue(
self, context, vnf_instance_id, vnfc_info_id):
self.cast(context, 'vnfm_auto_heal_queue',
vnf_instance_id=vnf_instance_id,
vnfc_info_id=vnfc_info_id)
def vnfm_auto_heal_remove_timer(self, context, vnf_instance_id):
self.cast(context, 'vnfm_auto_heal_remove_timer',
vnf_instance_id=vnf_instance_id)

View File

@ -385,8 +385,17 @@ class ConductorV2(object):
self.vnfpm_driver.store_job_info(context, report)
@log.log
def request_scale(self, context, id, scale_req):
self.prom_driver.request_scale(context, id, scale_req)
def trigger_scale(self, context, id, scale_req):
self.prom_driver.trigger_scale(context, id, scale_req)
@log.log
def vnfm_auto_heal_queue(
self, context, vnf_instance_id, vnfc_info_id):
self.prom_driver.heal_queue(context, vnf_instance_id, vnfc_info_id)
@log.log
def vnfm_auto_heal_remove_timer(self, context, vnf_instance_id):
self.prom_driver.remove_timer(vnf_instance_id)
@log.log
def server_notification_notify(

View File

@ -13,17 +13,66 @@
# License for the specific language governing permissions and limitations
# under the License.
import threading
from oslo_log import log as logging
from tacker.sol_refactored.common import config as cfg
from tacker.sol_refactored.common import http_client
from tacker.sol_refactored.common import vnflcm_utils
from tacker.sol_refactored import objects
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class VnfmAutoHealTimer():
def __init__(self, context, vnf_instance_id,
expiration_time, expiration_handler):
self.lock = threading.Lock()
self.expired = False
self.queue = set()
self.context = context
self.vnf_instance_id = vnf_instance_id
self.expiration_handler = expiration_handler
self.timer = threading.Timer(expiration_time, self.expire)
self.timer.start()
def expire(self):
_expired = False
with self.lock:
if not self.expired:
self._cancel()
_expired = True
if _expired:
self.expiration_handler(
self.context, self.vnf_instance_id, list(self.queue))
def add_vnfc_info_id(self, vnfc_info_id):
with self.lock:
if not self.expired:
self.queue.add(vnfc_info_id)
def _cancel(self):
self.timer.cancel()
self.expired = True
def cancel(self):
with self.lock:
if not self.expired:
self._cancel()
def __del__(self):
self.cancel()
class PrometheusPluginDriverStub():
def request_scale(self, context, vnf_instance_id, scale_req):
def trigger_scale(self, context, vnf_instance_id, scale_req):
pass
def heal_queue(self, context, vnf_instance_id, vnfc_info_id):
pass
def remove_timer(self, vnf_instance_id):
pass
@ -34,6 +83,7 @@ class PrometheusPluginDriver():
def instance():
if PrometheusPluginDriver._instance is None:
if (CONF.prometheus_plugin.auto_scaling or
CONF.prometheus_plugin.auto_healing or
CONF.prometheus_plugin.fault_management or
CONF.prometheus_plugin.performance_management):
PrometheusPluginDriver()
@ -45,19 +95,35 @@ class PrometheusPluginDriver():
def __init__(self):
if PrometheusPluginDriver._instance:
raise SystemError("Not constructor but instance() should be used.")
auth_handle = http_client.KeystonePasswordAuthHandle(
auth_url=CONF.keystone_authtoken.auth_url,
username=CONF.keystone_authtoken.username,
password=CONF.keystone_authtoken.password,
project_name=CONF.keystone_authtoken.project_name,
user_domain_name=CONF.keystone_authtoken.user_domain_name,
project_domain_name=CONF.keystone_authtoken.project_domain_name)
self.client = http_client.HttpClient(auth_handle)
PrometheusPluginDriver._instance = self
self.timer_map = {}
self.expiration_time = CONF.prometheus_plugin.timer_interval
def request_scale(self, context, vnf_instance_id, scale_req):
ep = CONF.v2_vnfm.endpoint
url = f'{ep}/vnflcm/v2/vnf_instances/{vnf_instance_id}/scale'
resp, _ = self.client.do_request(
url, "POST", context=context, body=scale_req, version="2.0.0")
LOG.info("AutoHealing request is processed: %d.", resp.status_code)
def heal_queue(self, context, vnf_instance_id, vnfc_info_id):
if vnf_instance_id not in self.timer_map:
self.timer_map[vnf_instance_id] = VnfmAutoHealTimer(
context, vnf_instance_id, self.expiration_time,
self.timer_expired)
self.timer_map[vnf_instance_id].add_vnfc_info_id(vnfc_info_id)
def remove_timer(self, vnf_instance_id):
if vnf_instance_id in self.timer_map:
self.timer_map[vnf_instance_id].cancel()
del self.timer_map[vnf_instance_id]
def trigger_heal(self, context, vnf_instance_id, vnfc_info_ids):
heal_req = objects.HealVnfRequest(vnfcInstanceId=vnfc_info_ids)
body = heal_req.to_dict()
LOG.info(f"VNFM AutoHealing is triggered. vnf: {vnf_instance_id}, "
f"vnfcInstanceId: {vnfc_info_ids}")
vnflcm_utils.heal(context, vnf_instance_id, body)
def timer_expired(self, context, vnf_instance_id, vnfc_info_ids):
self.remove_timer(vnf_instance_id)
self.trigger_heal(context, vnf_instance_id, vnfc_info_ids)
def trigger_scale(self, context, vnf_instance_id, scale_req):
LOG.info(f"VNFM AutoScaling is triggered. vnf: {vnf_instance_id}, "
f"type: {scale_req['type']}, aspectId: "
f"{scale_req['aspectId']}")
vnflcm_utils.scale(context, vnf_instance_id, scale_req)

View File

@ -47,6 +47,19 @@ class FmAlertController(prom_wsgi.PrometheusPluginAPIController):
return prom_wsgi.PrometheusPluginResponse(204, None)
class AutoHealingController(prom_wsgi.PrometheusPluginAPIController):
def auto_healing(self, request, body):
if not CONF.prometheus_plugin.auto_healing:
raise sol_ex.PrometheusPluginNotEnabled(
name='Auto healing')
cls = mon_base.get_class(
CONF.prometheus_plugin.auto_healing_package,
CONF.prometheus_plugin.auto_healing_class)
mon_base.MonitoringPlugin.get_instance(cls).alert(
request=request, body=body)
return prom_wsgi.PrometheusPluginResponse(204, None)
class AutoScalingController(prom_wsgi.PrometheusPluginAPIController):
def auto_scaling(self, request, body):
if not CONF.prometheus_plugin.auto_scaling:
@ -58,6 +71,3 @@ class AutoScalingController(prom_wsgi.PrometheusPluginAPIController):
mon_base.MonitoringPlugin.get_instance(cls).alert(
request=request, body=body)
return prom_wsgi.PrometheusPluginResponse(204, None)
def auto_scaling_id(self, request, _, body):
return self.auto_scaling(request, body)

View File

@ -14,8 +14,6 @@
# under the License.
from datetime import datetime
from oslo_log import log as logging
from oslo_utils import uuidutils
@ -30,6 +28,7 @@ from tacker.sol_refactored.common import lcm_op_occ_utils as lcmocc_utils
from tacker.sol_refactored.common import subscription_utils as subsc_utils
from tacker.sol_refactored.common import vim_utils
from tacker.sol_refactored.common import vnf_instance_utils as inst_utils
from tacker.sol_refactored.common import vnflcm_utils
from tacker.sol_refactored.conductor import conductor_rpc_v2
from tacker.sol_refactored.controller import vnflcm_view
from tacker.sol_refactored.nfvo import nfvo_client
@ -153,22 +152,6 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
self.endpoint)
return sol_wsgi.SolResponse(204, None)
def _new_lcmocc(self, inst_id, operation, req_body,
op_state=v2fields.LcmOperationStateType.STARTING):
now = datetime.utcnow()
lcmocc = objects.VnfLcmOpOccV2(
id=uuidutils.generate_uuid(),
operationState=op_state,
stateEnteredTime=now,
startTime=now,
vnfInstanceId=inst_id,
operation=operation,
isAutomaticInvocation=False,
isCancelPending=False,
operationParams=req_body)
return lcmocc
@validator.schema(schema.VnfInfoModificationRequest_V200, '2.0.0')
@coordinate.lock_vnf_instance('{id}')
def update(self, request, id, body):
@ -205,7 +188,8 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
detail="vnfcInstanceId(%s) does not exist."
% vnfc_mod['id'])
lcmocc = self._new_lcmocc(id, v2fields.LcmOperationType.MODIFY_INFO,
lcmocc = vnflcm_utils.new_lcmocc(
id, v2fields.LcmOperationType.MODIFY_INFO,
body, v2fields.LcmOperationStateType.PROCESSING)
lcmocc.create(context)
@ -226,8 +210,8 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
lcmocc_utils.check_lcmocc_in_progress(context, id)
lcmocc = self._new_lcmocc(id, v2fields.LcmOperationType.INSTANTIATE,
body)
lcmocc = vnflcm_utils.new_lcmocc(
id, v2fields.LcmOperationType.INSTANTIATE, body)
req_param = lcmocc.operationParams
# if there is partial vimConnectionInfo check and fulfill here.
@ -260,8 +244,8 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
lcmocc_utils.check_lcmocc_in_progress(context, id)
lcmocc = self._new_lcmocc(id, v2fields.LcmOperationType.TERMINATE,
body)
lcmocc = vnflcm_utils.new_lcmocc(
id, v2fields.LcmOperationType.TERMINATE, body)
lcmocc.create(context)
self.conductor_rpc.start_lcm_op(context, lcmocc.id)
@ -270,93 +254,19 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
return sol_wsgi.SolResponse(202, None, location=location)
def _get_current_scale_level(self, inst, aspect_id):
if (inst.obj_attr_is_set('instantiatedVnfInfo') and
inst.instantiatedVnfInfo.obj_attr_is_set('scaleStatus')):
for scale_info in inst.instantiatedVnfInfo.scaleStatus:
if scale_info.aspectId == aspect_id:
return scale_info.scaleLevel
def _get_max_scale_level(self, inst, aspect_id):
if (inst.obj_attr_is_set('instantiatedVnfInfo') and
inst.instantiatedVnfInfo.obj_attr_is_set('maxScaleLevels')):
for scale_info in inst.instantiatedVnfInfo.maxScaleLevels:
if scale_info.aspectId == aspect_id:
return scale_info.scaleLevel
@validator.schema(schema.ScaleVnfRequest_V200, '2.0.0')
@coordinate.lock_vnf_instance('{id}')
def scale(self, request, id, body):
context = request.context
inst = inst_utils.get_inst(context, id)
if inst.instantiationState != 'INSTANTIATED':
raise sol_ex.VnfInstanceIsNotInstantiated(inst_id=id)
lcmocc_utils.check_lcmocc_in_progress(context, id)
# check parameters
aspect_id = body['aspectId']
if 'numberOfSteps' not in body:
# set default value (1) defined by SOL specification for
# the convenience of the following methods.
body['numberOfSteps'] = 1
scale_level = self._get_current_scale_level(inst, aspect_id)
max_scale_level = self._get_max_scale_level(inst, aspect_id)
if scale_level is None or max_scale_level is None:
raise sol_ex.InvalidScaleAspectId(aspect_id=aspect_id)
num_steps = body['numberOfSteps']
if body['type'] == 'SCALE_IN':
num_steps *= -1
scale_level += num_steps
if scale_level < 0 or scale_level > max_scale_level:
raise sol_ex.InvalidScaleNumberOfSteps(
num_steps=body['numberOfSteps'])
lcmocc = self._new_lcmocc(id, v2fields.LcmOperationType.SCALE,
body)
lcmocc.create(context)
self.conductor_rpc.start_lcm_op(context, lcmocc.id)
lcmocc = vnflcm_utils.scale(context, id, body)
location = lcmocc_utils.lcmocc_href(lcmocc.id, self.endpoint)
return sol_wsgi.SolResponse(202, None, location=location)
@validator.schema(schema.HealVnfRequest_V200, '2.0.0')
@coordinate.lock_vnf_instance('{id}')
def heal(self, request, id, body):
context = request.context
inst = inst_utils.get_inst(context, id)
if inst.instantiationState != 'INSTANTIATED':
raise sol_ex.VnfInstanceIsNotInstantiated(inst_id=id)
lcmocc_utils.check_lcmocc_in_progress(context, id)
# check parameter for later use
is_all = body.get('additionalParams', {}).get('all', False)
if not isinstance(is_all, bool):
raise sol_ex.SolValidationError(
detail="additionalParams['all'] must be bool.")
if 'vnfcInstanceId' in body:
inst_info = inst.instantiatedVnfInfo
vnfc_id = []
if inst_info.obj_attr_is_set('vnfcInfo'):
vnfc_id = [vnfc.id for vnfc in inst_info.vnfcInfo]
for req_vnfc_id in body['vnfcInstanceId']:
if req_vnfc_id not in vnfc_id:
raise sol_ex.SolValidationError(
detail="vnfcInstanceId(%s) does not exist."
% req_vnfc_id)
lcmocc = self._new_lcmocc(id, v2fields.LcmOperationType.HEAL, body)
lcmocc.create(context)
self.conductor_rpc.start_lcm_op(context, lcmocc.id)
lcmocc = vnflcm_utils.heal(context, id, body)
location = lcmocc_utils.lcmocc_href(lcmocc.id, self.endpoint)
@ -373,7 +283,7 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
lcmocc_utils.check_lcmocc_in_progress(context, id)
lcmocc = self._new_lcmocc(
lcmocc = vnflcm_utils.new_lcmocc(
id, v2fields.LcmOperationType.CHANGE_EXT_CONN, body)
lcmocc.create(context)
@ -458,8 +368,8 @@ class VnfLcmControllerV2(sol_wsgi.SolAPIController):
raise sol_ex.SolValidationError(
detail="'lcm-kubernetes-def-files' must be specified")
lcmocc = self._new_lcmocc(id, v2fields.LcmOperationType.CHANGE_VNFPKG,
body)
lcmocc = vnflcm_utils.new_lcmocc(
id, v2fields.LcmOperationType.CHANGE_VNFPKG, body)
lcmocc.create(context)

View File

@ -707,6 +707,40 @@ class TestPrometheusPluginFm(base.TestCase):
pp._alert, self.request)
class TestPrometheusPluginAutoHealing(base.TestCase):
def setUp(self):
super(TestPrometheusPluginAutoHealing, self).setUp()
objects.register_all()
self.context = context.get_admin_context()
self.request = mock.Mock()
self.request.context = self.context
prometheus_plugin.PrometheusPluginAutoHealing._instance = None
def tearDown(self):
super(TestPrometheusPluginAutoHealing, self).tearDown()
# delete singleton object
prometheus_plugin.PrometheusPluginAutoHealing._instance = None
def test_constructor_error(self):
self.config_fixture.config(
group='prometheus_plugin', auto_healing=False)
mon_base.MonitoringPlugin.get_instance(
prometheus_plugin.PrometheusPluginAutoHealing)
self.assertRaises(
SystemError,
prometheus_plugin.PrometheusPluginAutoHealing)
def test_constructor_stub(self):
self.config_fixture.config(
group='prometheus_plugin', auto_healing=False)
pp = mon_base.MonitoringPlugin.get_instance(
prometheus_plugin.PrometheusPluginAutoHealing)
self.assertIsInstance(pp._instance, mon_base.MonitoringPluginStub)
pp = mon_base.MonitoringPlugin.get_instance(
prometheus_plugin.PrometheusPluginAutoHealing)
self.assertIsInstance(pp._instance, mon_base.MonitoringPluginStub)
class TestPrometheusPluginAutoScaling(base.TestCase):
def setUp(self):
super(TestPrometheusPluginAutoScaling, self).setUp()

View File

@ -13,10 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import webob
import time
from tacker import context
from tacker.sol_refactored.common import http_client
from tacker.sol_refactored.common import vnflcm_utils
from tacker.sol_refactored.conductor import conductor_v2
from tacker.sol_refactored.conductor import prometheus_plugin_driver as pp_drv
from tacker.sol_refactored import objects
@ -119,21 +119,25 @@ class TestPrometheusPlugin(db_base.SqlTestCase):
self.context = context.get_admin_context()
self.request = mock.Mock()
self.request.context = self.context
self.timer_test = (None, None)
self.config_fixture.config(
group='prometheus_plugin', performance_management=True)
self.conductor = conductor_v2.ConductorV2()
pp_drv.PrometheusPluginDriver._instance = None
@mock.patch.object(http_client.HttpClient, 'do_request')
def test_request_scale(self, mock_do_request):
resp = webob.Response()
resp.status_code = 202
mock_do_request.return_value = resp, {}
def tearDown(self):
super(TestPrometheusPlugin, self).tearDown()
pp_drv.PrometheusPluginDriver._instance = None
@mock.patch.object(vnflcm_utils, 'scale')
def test_request_scale(self, mock_do_scale):
scale_req = {
'type': 'SCALE_OUT',
'aspect_id': 'vdu',
'aspectId': 'vdu',
}
self.conductor.request_scale(
self.conductor.trigger_scale(
self.context, 'vnf_instance_id', scale_req)
self.assertEqual(1, mock_do_scale.call_count)
def test_constructor(self):
self.config_fixture.config(
@ -145,8 +149,7 @@ class TestPrometheusPlugin(db_base.SqlTestCase):
group='prometheus_plugin', performance_management=False)
pp_drv.PrometheusPluginDriver._instance = None
drv = pp_drv.PrometheusPluginDriver.instance()
drv = pp_drv.PrometheusPluginDriver.instance()
drv.request_scale(None, None, None)
drv.trigger_scale(None, None, None)
self.config_fixture.config(
group='prometheus_plugin', performance_management=True)
pp_drv.PrometheusPluginDriver._instance = None
@ -160,3 +163,73 @@ class TestPrometheusPlugin(db_base.SqlTestCase):
self.assertRaises(
SystemError,
pp_drv.PrometheusPluginDriver)
def test_conductor_vnfm_auto_heal_queue(self):
self.config_fixture.config(
group='prometheus_plugin', auto_healing=True)
pp_drv.PrometheusPluginDriver._instance = None
self.conductor.prom_driver = pp_drv.PrometheusPluginDriver.instance()
self.config_fixture.config(
group='prometheus_plugin', timer_interval=1)
# queueing test
id = 'test_id'
self.conductor.vnfm_auto_heal_queue(
self.context, id, 'id')
self.conductor.vnfm_auto_heal_queue(
self.context, id, 'id2')
self.assertEqual(
self.conductor.prom_driver.timer_map[id].queue,
{'id', 'id2'})
# Since the timeout period of `VnfmAutoHealTimer` is set to 1 second,
# it is also necessary to wait for 1 second before asserting.
time.sleep(1)
# remove_timer test
self.conductor.vnfm_auto_heal_remove_timer(self.context, id)
self.assertNotIn(id, self.conductor.prom_driver.timer_map)
# remove_timer test: invalid_id
self.conductor.vnfm_auto_heal_remove_timer(
self.context, 'invalid_id')
@mock.patch.object(vnflcm_utils, 'heal')
def test_conductor_timer_expired(self, mock_do_heal):
self.config_fixture.config(
group='prometheus_plugin', auto_healing=True)
pp_drv.PrometheusPluginDriver._instance = None
self.conductor.prom_driver = pp_drv.PrometheusPluginDriver.instance()
self.conductor.prom_driver.timer_expired(
self.context, 'test_id', ['id'])
def expired(self, context, id, queue):
queue.sort()
self.timer_test = (id, queue)
def test_timer(self):
# queueing test
timer = pp_drv.VnfmAutoHealTimer(self.context, 'id', 1, self.expired)
timer.add_vnfc_info_id('1')
timer.add_vnfc_info_id('3')
# Since the timeout period of `VnfmAutoHealTimer` is set to 1 second,
# it is also necessary to wait for 1 second before asserting.
time.sleep(1)
self.assertEqual(self.timer_test[0], 'id')
self.assertEqual(self.timer_test[1], ['1', '3'])
def test_timer_cancel(self):
# cancel test
timer = pp_drv.VnfmAutoHealTimer(self.context, 'id2', 1, self.expired)
timer.add_vnfc_info_id('5')
timer.cancel()
# Since the timeout period of `VnfmAutoHealTimer` is set to 1 second,
# it is also necessary to wait for 1 second before asserting.
time.sleep(1)
self.assertIsNone(self.timer_test[0])
self.assertIsNone(self.timer_test[1])
def test_timer_destructor(self):
# method call after cancel()
timer = pp_drv.VnfmAutoHealTimer(self.context, 'id', 1, self.expired)
timer.cancel()
timer.expire()
timer.add_vnfc_info_id(['4'])
timer.cancel()
timer.__del__()

View File

@ -145,15 +145,65 @@ _body_scale_alert1 = {
'fingerprint': '5ef77f1f8a3ecb8d'
}
_body_heal_alert1 = {
'status': 'firing',
'labels': {
'receiver_type': 'tacker',
'function_type': 'auto_heal',
'vnf_instance_id': 'vnf instance id',
'vnfc_info_id': 'vnfc info id'
},
'annotations': {
},
'startsAt': '2022-06-21T23:47:36.453Z',
'endsAt': '0001-01-01T00:00:00Z',
'generatorURL': 'http://controller147:9090/graph?g0.expr='
'up%7Bjob%3D%22node%22%7D+%3D%3D+0&g0.tab=1',
'fingerprint': '5ef77f1f8a3ecb8d'
}
# function_type mismatch
_body_scale_alert2 = copy.deepcopy(_body_scale_alert1)
_body_scale_alert2['labels']['function_type'] = 'vnffm'
_body_scale_alert3 = copy.deepcopy(_body_scale_alert1)
_body_scale_alert3['status'] = 'resolved'
_body_scale_alert4 = copy.deepcopy(_body_scale_alert1)
_body_scale_alert4['labels']['function_type'] = 'auto_heal'
_body_scale_alert5 = copy.deepcopy(_body_scale_alert1)
_body_scale_alert5['labels']['aspect_id'] = 'aspect id'
_body_scale = copy.deepcopy(_body_base)
_body_scale.update({
'alerts': [_body_scale_alert1, _body_scale_alert2]
})
_body_scale_continue = copy.deepcopy(_body_base)
_body_scale_continue.update({
'alerts': [_body_scale_alert3, _body_scale_alert4, _body_scale_alert5]
})
_body_heal = copy.deepcopy(_body_base)
_body_heal.update({
'alerts': [_body_heal_alert1]
})
_body_heal_alert2 = copy.deepcopy(_body_heal_alert1)
_body_heal_alert2['status'] = 'resolved'
_body_heal_alert3 = copy.deepcopy(_body_heal_alert1)
_body_heal_alert3['labels']['function_type'] = 'auto_scale'
_body_heal_alert4 = copy.deepcopy(_body_heal_alert1)
_body_heal_alert4['labels']['vnfc_info_id'] = 'vnfcInfoId'
_body_heal_continue = copy.deepcopy(_body_base)
_body_heal_continue.update({
'alerts': [_body_heal_alert2, _body_heal_alert3, _body_heal_alert4]
})
_inst1 = {
'id': 'test_id',
'vnfdId': 'vnfdId',
@ -187,6 +237,14 @@ _inst1 = {
'vduId': 'vdu_id',
'vnfcResourceInfoId': 'id2',
'vnfcState': 'STARTED'
}, {
'id': 'vnfc info id',
'vduId': 'vdu_id',
'vnfcResourceInfoId': 'id2',
'vnfcState': 'STARTED'
}],
'scaleStatus': [{
'aspectId': 'aspect'
}]
},
'metadata': {
@ -198,6 +256,31 @@ _inst2.update({
'vnfConfigurableProperties': {
'isAutoscaleEnabled': True
},
'instantiationState': 'INSTANTIATED'
})
_inst3 = copy.deepcopy(_inst1)
_inst3.update({
'vnfConfigurableProperties': {
'isAutoscaleEnabled': False
},
'instantiationState': 'INSTANTIATED'
})
_inst4 = copy.deepcopy(_inst1)
_inst4.update({
'vnfConfigurableProperties': {
'isAutohealEnabled': False
},
'instantiationState': 'INSTANTIATED'
})
_inst5 = copy.deepcopy(_inst1)
_inst5.update({
'vnfConfigurableProperties': {
'isAutohealEnabled': True
},
'instantiationState': 'INSTANTIATED'
})
datetime_test = datetime.datetime.fromisoformat(
@ -324,6 +407,64 @@ class TestPrometheusPluginFm(base.TestCase):
self.assertEqual(204, result.status)
class TestPrometheusPluginAutoHealing(base.TestCase):
def setUp(self):
super(TestPrometheusPluginAutoHealing, self).setUp()
objects.register_all()
self.context = context.get_admin_context()
self.request = mock.Mock()
self.request.context = self.context
self.controller = prometheus_plugin_controller.AutoHealingController()
plugin.PrometheusPluginAutoHealing._instance = None
def tearDown(self):
super(TestPrometheusPluginAutoHealing, self).tearDown()
# delete singleton object
plugin.PrometheusPluginAutoHealing._instance = None
def test_auto_healing_config_false(self):
self.config_fixture.config(
group='prometheus_plugin', auto_healing=False)
self.assertRaises(
sol_ex.PrometheusPluginNotEnabled,
self.controller.auto_healing, self.request, {})
@mock.patch.object(inst_utils, 'get_inst')
def test_auto_healing_no_autoheal_enabled(self, mock_inst):
self.config_fixture.config(
group='prometheus_plugin', auto_healing=True)
mock_inst.return_value = objects.VnfInstanceV2.from_dict(_inst4)
result = self.controller.auto_healing(
self.request, _body_heal)
self.assertEqual(204, result.status)
@mock.patch.object(inst_utils, 'get_inst')
def test_auto_healing_is_autoheal_enabled(self, mock_inst):
self.config_fixture.config(
group='prometheus_plugin', auto_healing=True)
mock_inst.return_value = objects.VnfInstanceV2.from_dict(_inst5)
result = self.controller.auto_healing(self.request, _body_heal)
self.assertEqual(204, result.status)
@mock.patch.object(inst_utils, 'get_inst')
def test_auto_healing_multiple_continue(self, mock_inst):
self.config_fixture.config(
group='prometheus_plugin', auto_healing=True)
mock_inst.return_value = objects.VnfInstanceV2.from_dict(_inst5)
result = self.controller.auto_healing(
self.request, _body_heal_continue)
self.assertEqual(204, result.status)
@mock.patch.object(inst_utils, 'get_inst')
def test_auto_healing_not_instantiated(self, mock_inst):
self.config_fixture.config(
group='prometheus_plugin', auto_healing=True)
mock_inst.return_value = objects.VnfInstanceV2.from_dict(_inst1)
result = self.controller.auto_healing(
self.request, _body_heal)
self.assertEqual(204, result.status)
class TestPrometheusPluginAutoScaling(base.TestCase):
def setUp(self):
super(TestPrometheusPluginAutoScaling, self).setUp()
@ -344,15 +485,15 @@ class TestPrometheusPluginAutoScaling(base.TestCase):
group='prometheus_plugin', auto_scaling=False)
self.assertRaises(
sol_ex.PrometheusPluginNotEnabled,
self.controller.auto_scaling_id, self.request, 'id', {})
self.controller.auto_scaling, self.request, {})
@mock.patch.object(inst_utils, 'get_inst')
def test_auto_scaling_no_autoscale_enabled(self, mock_inst):
self.config_fixture.config(
group='prometheus_plugin', auto_scaling=True)
mock_inst.return_value = objects.VnfInstanceV2.from_dict(_inst1)
result = self.controller.auto_scaling_id(
self.request, 'id', _body_scale)
mock_inst.return_value = objects.VnfInstanceV2.from_dict(_inst3)
result = self.controller.auto_scaling(
self.request, _body_scale)
self.assertEqual(204, result.status)
@mock.patch.object(inst_utils, 'get_inst')
@ -378,3 +519,21 @@ class TestPrometheusPluginAutoScaling(base.TestCase):
group='prometheus_plugin', auto_scaling=True)
result = self.controller.auto_scaling(self.request, {})
self.assertEqual(204, result.status)
@mock.patch.object(inst_utils, 'get_inst')
def test_auto_scaling_multiple_continue(self, mock_inst):
self.config_fixture.config(
group='prometheus_plugin', auto_scaling=True)
mock_inst.return_value = objects.VnfInstanceV2.from_dict(_inst2)
result = self.controller.auto_scaling(
self.request, _body_scale_continue)
self.assertEqual(204, result.status)
@mock.patch.object(inst_utils, 'get_inst')
def test_auto_scaling_not_instantiated(self, mock_inst):
self.config_fixture.config(
group='prometheus_plugin', auto_scaling=True)
mock_inst.return_value = objects.VnfInstanceV2.from_dict(_inst1)
result = self.controller.auto_scaling(
self.request, _body_scale)
self.assertEqual(204, result.status)