tacker/tacker/tests/unit/sol_refactored/conductor/test_prometheus_plugin.py

163 lines
5.3 KiB
Python

# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import webob
from tacker import context
from tacker.sol_refactored.common import http_client
from tacker.sol_refactored.conductor import conductor_v2
from tacker.sol_refactored.conductor import prometheus_plugin_driver as pp_drv
from tacker.sol_refactored import objects
from tacker.tests.unit.db import base as db_base
from unittest import mock
_req1 = {
'flavourId': 'flavour'
}
_req2 = {
'flavourId': 'flavour',
'additionalParams': {},
}
_req3 = {
'flavourId': 'flavour',
'additionalParams': {
'monitoring': {
'monitorName': 'prometheus',
'driverType': 'external',
'targetsInfo': [
{
'prometheusHost':
'prometheusHost',
'prometheusHostPort': '22',
'authInfo': {
'ssh_username': 'ssh_username',
'ssh_password': 'ssh_password'
},
'alertRuleConfigPath':
'alertRuleConfigPath',
'prometheusReloadApiEndpoint':
'prometheusReloadApiEndpoint'
}
]
}
},
}
_inst1 = {
'id': '25b9b9d0-2461-4109-866e-a7767375415b',
'vnfdId': 'vnfdId',
'vnfProvider': 'vnfProvider',
'vnfProductName': 'vnfProductName',
'vnfSoftwareVersion': 'vnfSoftwareVersion',
'vnfdVersion': 'vnfdVersion',
'instantiationState': 'NOT_INSTANTIATED',
}
_inst2 = {
'id': '25b9b9d0-2461-4109-866e-a7767375415b',
'vnfdId': 'vnfdId',
'vnfProvider': 'vnfProvider',
'vnfProductName': 'vnfProductName',
'vnfSoftwareVersion': 'vnfSoftwareVersion',
'vnfdVersion': 'vnfdVersion',
'instantiationState': 'NOT_INSTANTIATED',
'metadata': {}
}
_inst3 = {
'id': '25b9b9d0-2461-4109-866e-a7767375415b',
'vnfdId': 'vnfdId',
'vnfProvider': 'vnfProvider',
'vnfProductName': 'vnfProductName',
'vnfSoftwareVersion': 'vnfSoftwareVersion',
'vnfdVersion': 'vnfdVersion',
'instantiationState': 'NOT_INSTANTIATED',
'metadata': {
'monitoring': {
'monitorName': 'prometheus',
'driverType': 'external',
'targetsInfo': [
{
'prometheusHost':
'prometheusHost',
'prometheusHostPort': '22',
'authInfo': {
'ssh_username': 'ssh_username',
'ssh_password': 'ssh_password'
},
'alertRuleConfigPath':
'alertRuleConfigPath',
'prometheusReloadApiEndpoint':
'prometheusReloadApiEndpoint'
}
]
}
}
}
class TestPrometheusPlugin(db_base.SqlTestCase):
def setUp(self):
super(TestPrometheusPlugin, self).setUp()
objects.register_all()
self.context = context.get_admin_context()
self.request = mock.Mock()
self.request.context = self.context
self.config_fixture.config(
group='prometheus_plugin', performance_management=True)
self.conductor = conductor_v2.ConductorV2()
@mock.patch.object(http_client.HttpClient, 'do_request')
def test_request_scale(self, mock_do_request):
resp = webob.Response()
resp.status_code = 202
mock_do_request.return_value = resp, {}
scale_req = {
'type': 'SCALE_OUT',
'aspect_id': 'vdu',
}
self.conductor.request_scale(
self.context, 'vnf_instance_id', scale_req)
def test_constructor(self):
self.config_fixture.config(
group='prometheus_plugin', performance_management=True)
pp_drv.PrometheusPluginDriver._instance = None
def test_driver_stub(self):
self.config_fixture.config(
group='prometheus_plugin', performance_management=False)
pp_drv.PrometheusPluginDriver._instance = None
drv = pp_drv.PrometheusPluginDriver.instance()
drv = pp_drv.PrometheusPluginDriver.instance()
drv.request_scale(None, None, None)
self.config_fixture.config(
group='prometheus_plugin', performance_management=True)
pp_drv.PrometheusPluginDriver._instance = None
drv = pp_drv.PrometheusPluginDriver.instance()
def test_driver_constructor(self):
self.config_fixture.config(
group='prometheus_plugin', performance_management=True)
pp_drv.PrometheusPluginDriver._instance = None
pp_drv.PrometheusPluginDriver.instance()
self.assertRaises(
SystemError,
pp_drv.PrometheusPluginDriver)