Support PromQL config file for PrometheusPlugin

The PromQL statement data for PrometheusPlugin is able to
customize with external data file. The operators can use the
original PromQL statement with this file.

Implements: blueprint support-auto-lcm
Change-Id: Ie84eef8098feabaf4a82a33610248dcae5e205c0
This commit is contained in:
Koji Shimizu 2023-01-25 10:43:06 +09:00
parent aac03ceffc
commit 5ab59f7edb
15 changed files with 1201 additions and 322 deletions

View File

@ -238,6 +238,7 @@ function configure_tacker {
cd -
cp $TACKER_DIR/etc/tacker/tacker.conf.sample $TACKER_CONF
cp $TACKER_DIR/etc/tacker/prometheus-plugin.yaml $TACKER_CONF_DIR/prometheus-plugin.yaml
iniset_rpc_backend tacker $TACKER_CONF

View File

@ -321,6 +321,11 @@ Tacker Zed release
- Prometheus: 2.37
- Alertmanager: 0.24
Tacker Antelope release
- Prometheus: 2.37
- Alertmanager: 0.25
Alert rule registration
~~~~~~~~~~~~~~~~~~~~~~~
@ -373,7 +378,7 @@ at "metadata" field.
With the parameter, pod name can be specified but container name can not.
And some prometheus metrics need container name. Therefore, ``max``
statement of PromQL is alternatively used in some measurements to
measure without container name. That means it provids only most
measure without container name. That means it provides only most
impacted value among the containers. For example:
``avg(max(container_fs_usage_bytes{pod=~"pod name"} /
@ -448,6 +453,107 @@ rule file directly. Below is example of alert rule.
vnfc_info_id: VDU1-85adebfa-d71c-49ab-9d39-d8dd7e393541
annotations:
External data file
~~~~~~~~~~~~~~~~~~
The PromQL statement data for Performance Management
is able to customize with external data file. The operators can use the
original PromQL statement with this file.
The external data file includes configuration about PromQL statement for
Performance Management. The template of the file is located
at etc/tacker/prometheus-plugin.yaml from the tacker project source directory.
Edit this file if you need and put it in the configuration directory
(e.g. /etc/tacker).
Default configuration file
--------------------------
Normally, the default external data file is automatically deployed at the
installation process. However if you need to deploy the file manually,
execute below command at the top directory of tacker project.
.. code-block:: console
sudo python3 ./setup.py install
Data format
-----------
The file is described in yaml format [#yaml]_.
Root configuration
------------------
The configuration consists of PromQL config for PMJob API and
PromQL config for Threshold API. The PMJob and the Threshold are
defined in ETSI GS NFV-SOL 003 [#etsi_sol_003]_.
.. code-block:: yaml
# PromQL config for PM Job API
PMJob:
PromQL: <PromQLConfig>
# PromQL config for Threshold API
Threshold:
PromQL: <PromQLConfig>
<PromQLConfig>
--------------
The elements of PromQLConfig are key-value pairs of a performanceMetric
and a PromQL statement. These performanceMetric are defined in
ETSI GS NFV-SOL 003 [#etsi_sol_003]_.
.. code-block:: yaml
<PromQLConfig>
VCpuUsageMeanVnf: <F-string of PromQL statement>
VCpuUsagePeakVnf: <F-string of PromQL statement>
VMemoryUsageMeanVnf: <F-string of PromQL statement>
VMemoryUsagePeakVnf: <F-string of PromQL statement>
VDiskUsageMeanVnf: <F-string of PromQL statement>
VDiskUsagePeakVnf: <F-string of PromQL statement>
ByteIncomingVnfIntCp: <F-string of PromQL statement>
PacketIncomingVnfIntCp: <F-string of PromQL statement>
ByteOutgoingVnfIntCp: <F-string of PromQL statement>
PacketOutgoingVnfIntCp: <F-string of PromQL statement>
ByteIncomingVnfExtCp: <F-string of PromQL statement>
PacketIncomingVnfExtCp: <F-string of PromQL statement>
ByteOutgoingVnfExtCp: <F-string of PromQL statement>
PacketOutgoingVnfExtCp: <F-string of PromQL statement>
For example, VCpuUsageMeanVnf can be described as below.
.. code-block:: yaml
VCpuUsageMeanVnf: >-
avg(sum(rate(pod_cpu_usage_seconds_total
{{namespace="{namespace}",pod=~"{pod}"}}[{reporting_period}s])))
F-string of PromQL statement
----------------------------
For above PromQL statement, f-string of python [#f_string]_ is used.
In the f-string, below replacement field can be used. They are replaced
with a SOL-API's attribute [#etsi_sol_003]_ or Tacker internal value.
``{collection_period}``
Replaced with collectionPeriod attribute of SOL-API.
``{pod}``
Replaced with a resourceId when subObjectInstanceIds are specified
(e.g: "test-test1-8d6db447f-stzhb").
Or, replaced with regexp that matches each resourceIds in vnfInstance when
subObjectInstanceIds are not specified
(e.g: "(test-test1-[0-9a-f]{1,10}-[0-9a-z]{5}$|
test-test2-[0-9a-f]{1,10}-[0-9a-z]{5}$)").
``{reporting_period}``
Replaced with reportingPeriod attribute of SOL-API.
``{sub_object_instance_id}``
Replaced with an element of subObjectInstanceIds of SOL-API.
``{namespace}``
Replaced with the kubernetes namespace that the vnfInstance belongs to.
Using Vendor Specific Plugin
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -487,3 +593,8 @@ tacker.sol_refactored.common.monitoring_plugin_base.MonitoringPlugin.
* - ``CONF.prometheus_plugin.auto_healing_class``
- PrometheusPluginAutoHealing
- Class name for auto healing.
.. rubric:: Footnotes
.. [#yaml] https://yaml.org/spec/1.2-old/spec.html
.. [#etsi_sol_003] https://www.etsi.org/deliver/etsi_gs/NFV-SOL/001_099/003/03.03.01_60/gs_nfv-sol003v030301p.pdf
.. [#f_string] https://docs.python.org/3.9/tutorial/inputoutput.html#fancier-output-formatting

View File

@ -0,0 +1,121 @@
# Prometheus plugin configuration file
#
# This describes the Prometheus plugin configuration. This is used when
# Prometheus Plugin creates an alert rule. You can use your own promQL
# statements. Put this file in the configuration directory (e.g:/etc/tacker).
#
# The settings are key-value pairs of a performanceMetric and a PromQL
# statement. The performanceMetric is defined in ETSI GS NFV-SOL 003. For
# PromQL statement, f-string of python is used. In the f-string, below
# replacement field can be used. They are replaced with a SOL-API's attribute
# or Tacker internal value.
#
# {collection_period}:
# Replaced with collectionPeriod attribute of SOL-API.
# {pod}:
# Replaced with a resourceId when subObjectInstanceIds are specified.
# e.g: test-test1-8d6db447f-stzhb
# Replaced with regexp that matches each resourceId in vnfInstance when
# subObjectInstanceIds are not specified.
# e.g: (test-test1-[0-9a-f]{1,10}-[0-9a-z]{5}$|
# test-test2-[0-9a-f]{1,10}-[0-9a-z]{5}$)
# {reporting_period}:
# Replaced with reportingPeriod attribute of SOL-API.
# {sub_object_instance_id}:
# Replaced with an element of subObjectInstanceIds of SOL-API.
# {namespace}:
# Replaced with the kubernetes namespace that the vnfInstance belongs to.
#
PMJob:
PromQL:
VCpuUsageMeanVnf: >-
avg(sum(rate(pod_cpu_usage_seconds_total
{{namespace="{namespace}",pod=~"{pod}"}}[{reporting_period}s])))
VCpuUsagePeakVnf: >-
max(sum(rate(pod_cpu_usage_seconds_total
{{namespace="{namespace}",pod=~"{pod}"}}[{reporting_period}s])))
VMemoryUsageMeanVnf: >-
avg(pod_memory_working_set_bytes{{namespace="{namespace}",pod=~"{pod}"}} /
on(pod) (kube_node_status_capacity{{resource="memory"}} *
on(node) group_right kube_pod_info{{pod=~"{pod}"}}))
VMemoryUsagePeakVnf: >-
max(pod_memory_working_set_bytes{{namespace="{namespace}",pod=~"{pod}"}} /
on(pod) (kube_node_status_capacity{{resource="memory"}} *
on(node) group_right kube_pod_info{{pod=~"{pod}"}}))
VDiskUsageMeanVnf: >-
avg(max(container_fs_usage_bytes{{namespace="{namespace}",pod=~"{pod}"}}/
container_fs_limit_bytes{{namespace="{namespace}",pod=~"{pod}"}}))
VDiskUsagePeakVnf: >-
max(max(container_fs_usage_bytes{{namespace="{namespace}",pod=~"{pod}"}}/
container_fs_limit_bytes{{namespace="{namespace}",pod=~"{pod}"}}))
ByteIncomingVnfIntCp: >-
sum(container_network_receive_bytes_total
{{namespace="{namespace}",interface="{sub_object_instance_id}",pod=~"{pod}"}})
PacketIncomingVnfIntCp: >-
sum(container_network_receive_packets_total
{{namespace="{namespace}",interface="{sub_object_instance_id}",pod=~"{pod}"}})
ByteOutgoingVnfIntCp: >-
sum(container_network_transmit_bytes_total
{{namespace="{namespace}",interface="{sub_object_instance_id}",pod=~"{pod}"}})
PacketOutgoingVnfIntCp: >-
sum(container_network_transmit_packets_total
{{namespace="{namespace}",interface="{sub_object_instance_id}",pod=~"{pod}"}})
ByteIncomingVnfExtCp: >-
sum(container_network_receive_bytes_total
{{namespace="{namespace}",interface="{sub_object_instance_id}",pod=~"{pod}"}})
PacketIncomingVnfExtCp: >-
sum(container_network_receive_packets_total
{{namespace="{namespace}",interface="{sub_object_instance_id}",pod=~"{pod}"}})
ByteOutgoingVnfExtCp: >-
sum(container_network_transmit_bytes_total
{{namespace="{namespace}",interface="{sub_object_instance_id}",pod=~"{pod}"}})
PacketOutgoingVnfExtCp: >-
sum(container_network_transmit_packets_total
{{namespace="{namespace}",interface="{sub_object_instance_id}",pod=~"{pod}"}})
Threshold:
PromQL:
VCpuUsageMeanVnf: >-
avg(sum(rate(pod_cpu_usage_seconds_total
{{namespace="{namespace}",pod=~"{pod}"}}[{reporting_period}s])))
VCpuUsagePeakVnf: >-
max(sum(rate(pod_cpu_usage_seconds_total
{{namespace="{namespace}",pod=~"{pod}"}}[{reporting_period}s])))
VMemoryUsageMeanVnf: >-
avg(pod_memory_working_set_bytes{{namespace="{namespace}",pod=~"{pod}"}} /
on(pod) (kube_node_status_capacity{{resource="memory"}} *
on(node) group_right kube_pod_info{{pod=~"{pod}"}}))
VMemoryUsagePeakVnf: >-
max(pod_memory_working_set_bytes{{namespace="{namespace}",pod=~"{pod}"}} /
on(pod) (kube_node_status_capacity{{resource="memory"}} *
on(node) group_right kube_pod_info{{pod=~"{pod}"}}))
VDiskUsageMeanVnf: >-
avg(max(container_fs_usage_bytes{{namespace="{namespace}",pod=~"{pod}"}}/
container_fs_limit_bytes{{namespace="{namespace}",pod=~"{pod}"}}))
VDiskUsagePeakVnf: >-
max(max(container_fs_usage_bytes{{namespace="{namespace}",pod=~"{pod}"}}/
container_fs_limit_bytes{{namespace="{namespace}",pod=~"{pod}"}}))
ByteIncomingVnfIntCp: >-
sum(container_network_receive_bytes_total
{{namespace="{namespace}",interface="{sub_object_instance_id}",pod=~"{pod}"}})
PacketIncomingVnfIntCp: >-
sum(container_network_receive_packets_total
{{namespace="{namespace}",interface="{sub_object_instance_id}",pod=~"{pod}"}})
ByteOutgoingVnfIntCp: >-
sum(container_network_transmit_bytes_total
{{namespace="{namespace}",interface="{sub_object_instance_id}",pod=~"{pod}"}})
PacketOutgoingVnfIntCp: >-
sum(container_network_transmit_packets_total
{{namespace="{namespace}",interface="{sub_object_instance_id}",pod=~"{pod}"}})
ByteIncomingVnfExtCp: >-
sum(container_network_receive_bytes_total
{{namespace="{namespace}",interface="{sub_object_instance_id}",pod=~"{pod}"}})
PacketIncomingVnfExtCp: >-
sum(container_network_receive_packets_total
{{namespace="{namespace}",interface="{sub_object_instance_id}",pod=~"{pod}"}})
ByteOutgoingVnfExtCp: >-
sum(container_network_transmit_bytes_total
{{namespace="{namespace}",interface="{sub_object_instance_id}",pod=~"{pod}"}})
PacketOutgoingVnfExtCp: >-
sum(container_network_transmit_packets_total
{{namespace="{namespace}",interface="{sub_object_instance_id}",pod=~"{pod}"}})

View File

@ -28,6 +28,7 @@ data_files =
etc/tacker =
etc/tacker/api-paste.ini
etc/tacker/rootwrap.conf
etc/tacker/prometheus-plugin.yaml
etc/rootwrap.d =
etc/tacker/rootwrap.d/tacker.filters
etc/init.d = etc/init.d/tacker-server

View File

@ -20,10 +20,12 @@ import os
import paramiko
import re
import tempfile
import yaml
from keystoneauth1 import exceptions as ks_exc
from oslo_log import log as logging
from oslo_utils import uuidutils
from tacker.common import utils
from tacker.sol_refactored.api import prometheus_plugin_validator as validator
from tacker.sol_refactored.api.schemas import prometheus_plugin_schemas
from tacker.sol_refactored.common import config as cfg
@ -54,7 +56,283 @@ class PrometheusPlugin():
return t if t.tzinfo else t.astimezone()
class PrometheusPluginPm(PrometheusPlugin, mon_base.MonitoringPlugin):
class PrometheusPluginPmBase(PrometheusPlugin):
def __init__(self):
super(PrometheusPluginPmBase, self).__init__()
auth_handle = http_client.NoAuthHandle()
self.client = http_client.HttpClient(auth_handle)
def convert_measurement_unit(self, metric, value):
if re.match(r'^V(Cpu|Memory|Disk)Usage(Mean|Peak)Vnf\..+', metric):
value = float(value)
elif re.match(r'^(Byte|Packet)(Incoming|Outgoing)Vnf(IntCp|ExtCp)',
metric):
value = int(value)
else:
raise sol_ex.PrometheusPluginError(
"Failed to convert annotations.value to measurement unit.")
return value
def load_prom_config(self):
config_file = utils.find_config_file({}, 'prometheus-plugin.yaml')
if not config_file:
raise sol_ex.PrometheusPluginError(
"prometheus-plugin.yaml not found."
)
LOG.info(f"prom_config file: {config_file}")
with open(config_file) as file:
prom_config = yaml.safe_load(file.read())
return prom_config
def make_prom_ql(self, target, pod, collection_period=30,
reporting_period=90, sub_object_instance_id='*',
pm_type='PMJob', namespace='default'):
REPORTING_PERIOD_MIN = 30
reporting_period = max(reporting_period, REPORTING_PERIOD_MIN)
prom_config = self.load_prom_config()
expr = prom_config[pm_type]['PromQL'][target].format(
pod=pod,
collection_period=collection_period,
reporting_period=reporting_period,
sub_object_instance_id=sub_object_instance_id,
namespace=namespace
)
LOG.info(f"promQL expr: {expr}")
return expr
def make_rule(self, type, id, object_instance_id, sub_object_instance_id,
metric, expression, collection_period=30):
if type == 'PMJob':
labels = {
'alertname': '',
'receiver_type': 'tacker',
'function_type': 'vnfpm',
'job_id': id,
'object_instance_id': object_instance_id,
'sub_object_instance_id': sub_object_instance_id,
'metric': metric
}
elif type == 'Threshold':
labels = {
'alertname': '',
'receiver_type': 'tacker',
'function_type': 'vnfpm-threshold',
'threshold_id': id,
'object_instance_id': object_instance_id,
'sub_object_instance_id': sub_object_instance_id,
'metric': metric
}
else:
raise sol_ex.PrometheusPluginError(
"Invalid type in make_rule()."
)
labels = {k: v for k, v in labels.items() if v is not None}
annotations = {
'value': r'{{$value}}'
}
rule = {
'alert': uuidutils.generate_uuid(),
'expr': expression,
'for': f'{collection_period}s',
'labels': labels,
'annotations': annotations
}
return rule
def get_namespace(self, inst):
return inst.instantiatedVnfInfo.metadata.get(
'namespace', 'default') if (
inst.obj_attr_is_set('instantiatedVnfInfo') and
inst.instantiatedVnfInfo.obj_attr_is_set(
'metadata')) else 'default'
def get_vnfc_resource_info(self, inst):
return inst.instantiatedVnfInfo.vnfcResourceInfo if (
inst.obj_attr_is_set('instantiatedVnfInfo') and
inst.instantiatedVnfInfo.obj_attr_is_set(
'vnfcResourceInfo')) else None
def get_pod_regexp(self, inst):
# resource ids are like:
# ['test-test1-756757f8f-xcwmt',
# 'test-test2-756757f8f-kmghr', ...]
# convert them to a regex string such as:
# '(test-test1-[0-9a-f]{1,10}-[0-9a-z]{5}$|
# test-test2-[0-9a-f]{1,10}-[0-9a-z]{5}$|...)'
resource_info = self.get_vnfc_resource_info(inst)
if not resource_info:
return None
deployments = list(filter(
lambda r:
r.computeResource.obj_attr_is_set(
'vimLevelResourceType')
and r.computeResource.obj_attr_is_set(
'resourceId'
)
and r.computeResource.vimLevelResourceType ==
'Deployment', resource_info
))
deployments = list(set(list(map(
lambda d: re.sub(
r'\-[0-9a-f]{1,10}\-[0-9a-z]{5}$', '',
d.computeResource.resourceId) +
r'-[0-9a-f]{1,10}-[0-9a-z]{5}$',
deployments
))))
return ('(' + '|'.join(deployments) + ')'
if len(deployments) else None)
def get_compute_resource_by_sub_obj(self, inst, sub_obj):
if (not inst.obj_attr_is_set('instantiatedVnfInfo') or
not inst.instantiatedVnfInfo.obj_attr_is_set(
'vnfcResourceInfo') or
not inst.instantiatedVnfInfo.obj_attr_is_set('vnfcInfo')):
return None
vnfc_info = list(filter(
lambda x: (x.obj_attr_is_set('vnfcResourceInfoId') and
x.id == sub_obj),
inst.instantiatedVnfInfo.vnfcInfo))
if len(vnfc_info) == 0:
return None
resources = list(filter(
lambda x: (vnfc_info[0].obj_attr_is_set('vnfcResourceInfoId') and
x.id == vnfc_info[0].vnfcResourceInfoId and
x.computeResource.obj_attr_is_set('vimLevelResourceType') and
x.computeResource.vimLevelResourceType == 'Deployment' and
x.computeResource.obj_attr_is_set('resourceId')),
inst.instantiatedVnfInfo.vnfcResourceInfo))
if len(resources) == 0:
return None
return resources[0].computeResource
def _delete_rule(self, host, port, user, password, path, id):
with paramiko.Transport(sock=(host, port)) as client:
client.connect(username=user, password=password)
sftp = paramiko.SFTPClient.from_transport(client)
sftp.remove(f'{path}/{id}.json')
def reload_prom_server(self, context, reload_uri):
resp, _ = self.client.do_request(
reload_uri, "PUT", context=context)
if resp.status_code >= 400 and resp.status_code < 600:
raise sol_ex.PrometheusPluginError(
f"Reloading request to prometheus is failed: "
f"{resp.status_code}.")
def _upload_rule(self, rule_group, host, port, user, password, path, id):
with tempfile.TemporaryDirectory() as tmpdir:
with open(os.path.join(tmpdir, 'rule.json'),
'w+', encoding="utf-8") as fp:
json.dump(rule_group, fp, indent=4, ensure_ascii=False)
filename = fp.name
with paramiko.Transport(sock=(host, port)) as client:
LOG.info("Upload rule files to prometheus server: %s.", host)
client.connect(username=user, password=password)
sftp = paramiko.SFTPClient.from_transport(client)
sftp.put(filename, f'{path}/{id}.json')
self.verify_rule(host, port, user, password, path, id)
def verify_rule(self, host, port, user, password, path, id):
if not CONF.prometheus_plugin.test_rule_with_promtool:
return
with paramiko.SSHClient() as client:
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host, port=port, username=user, password=password)
command = f"promtool check rules {path}/{id}.json"
LOG.info("Rule file validation command: %s", command)
_, stdout, stderr = client.exec_command(command)
if stdout.channel.recv_exit_status() != 0:
error_byte = stderr.read()
error_str = error_byte.decode('utf-8')
LOG.error(
"Rule file validation with promtool failed: %s",
error_str)
raise sol_ex.PrometheusPluginError(
"Rule file validation with promtool failed.")
def delete_rules(self, context, pm_job_or_threshold):
target_list, reload_list = self.get_access_info(pm_job_or_threshold)
for target in target_list:
try:
self._delete_rule(
target['host'], target['port'], target['user'],
target['password'], target['path'], pm_job_or_threshold.id)
except (sol_ex.PrometheusPluginError, ks_exc.ClientException,
paramiko.SSHException):
# NOTE(shimizu-koji): This exception is ignored.
# DELETE /pm_jobs/{id} will be success even if _delete_rule()
# is failed. Because the rule file was already deleted.
pass
for uri in reload_list:
try:
self.reload_prom_server(context, uri)
except (sol_ex.PrometheusPluginError, ks_exc.ClientException,
paramiko.SSHException):
pass
def get_access_info(self, pm_job_or_threshold):
target_list = []
reload_list = []
if (not pm_job_or_threshold.obj_attr_is_set('metadata')
or 'monitoring' not in pm_job_or_threshold.metadata):
raise sol_ex.PrometheusPluginError(
"monitoring info is missing at metadata field.")
access_info = pm_job_or_threshold.metadata['monitoring']
if (access_info.get('monitorName') != 'prometheus' or
access_info.get('driverType') != 'external'):
raise sol_ex.PrometheusPluginError(
"prometheus info is missing at metadata field.")
for info in access_info.get('targetsInfo', []):
host = info.get('prometheusHost', '')
port = info.get('prometheusHostPort', 22)
auth = info.get('authInfo', {})
user = auth.get('ssh_username', '')
password = auth.get('ssh_password', '')
path = info.get('alertRuleConfigPath', '')
uri = info.get('prometheusReloadApiEndpoint', '')
if not (host and user and path and uri):
continue
target_list.append({
'host': host,
'port': port,
'user': user,
'password': password,
'path': path
})
reload_list.append(uri)
return target_list, list(set(reload_list))
def upload_rules(self, context, target_list, reload_list, rule_group, id):
def _cleanup_error(target_list):
for target in target_list:
try:
self._delete_rule(target['host'], target['port'],
target['user'], target['password'], target['path'],
id)
except (sol_ex.PrometheusPluginError, ks_exc.ClientException,
paramiko.SSHException):
pass
try:
for target in target_list:
self._upload_rule(
rule_group, target['host'], target['port'],
target['user'], target['password'], target['path'],
id)
for uri in reload_list:
self.reload_prom_server(context, uri)
except (sol_ex.PrometheusPluginError, ks_exc.ClientException,
paramiko.SSHException) as e:
LOG.error("failed to upload rule files: %s", e.args[0])
_cleanup_error(target_list)
raise e
except Exception as e:
_cleanup_error(target_list)
raise e
class PrometheusPluginPm(PrometheusPluginPmBase, mon_base.MonitoringPlugin):
_instance = None
@staticmethod
@ -73,62 +351,9 @@ class PrometheusPluginPm(PrometheusPlugin, mon_base.MonitoringPlugin):
"Not constructor but instance() should be used.")
super(PrometheusPluginPm, self).__init__()
self.notification_callback = None
auth_handle = http_client.NoAuthHandle()
self.client = http_client.HttpClient(auth_handle)
self.reporting_period_margin = (
CONF.prometheus_plugin.reporting_period_margin)
self.notification_callback = self.default_callback
# Pod name can be specified but container name can not.
# And some prometheus metrics need container name. Therefore, max
# statement of PromQL is alternatively used in some measurements to
# measure without container name. That means it provids only most
# impacted value among the containers.
self.sol_exprs = {
'VCpuUsageMeanVnf':
'avg(sum(rate(pod_cpu_usage_seconds_total'
'{{pod=~"{pod}"}}[{reporting_period}s])))',
'VCpuUsagePeakVnf':
'max(sum(rate(pod_cpu_usage_seconds_total'
'{{pod=~"{pod}"}}[{reporting_period}s])))',
'VMemoryUsageMeanVnf':
'avg(pod_memory_working_set_bytes{{pod=~"{pod}"}} / '
'on(pod) (kube_node_status_capacity{{resource="memory"}} * '
'on(node) group_right kube_pod_info))',
'VMemoryUsagePeakVnf':
'max(pod_memory_working_set_bytes{{pod=~"{pod}"}} / '
'on(pod) (kube_node_status_capacity{{resource="memory"}} * '
'on(node) group_right kube_pod_info))',
'VDiskUsageMeanVnf':
'avg(max(container_fs_usage_bytes{{pod=~"{pod}"}}/'
'container_fs_limit_bytes{{pod=~"{pod}"}}))',
'VDiskUsagePeakVnf':
'max(max(container_fs_usage_bytes{{pod=~"{pod}"}}/'
'container_fs_limit_bytes{{pod=~"{pod}"}}))',
'ByteIncomingVnfIntCp':
'sum(container_network_receive_bytes_total'
'{{interface="{sub_object_instance_id}",pod=~"{pod}"}})',
'PacketIncomingVnfIntCp':
'sum(container_network_receive_packets_total'
'{{interface="{sub_object_instance_id}",pod=~"{pod}"}})',
'ByteOutgoingVnfIntCp':
'sum(container_network_transmit_bytes_total'
'{{interface="{sub_object_instance_id}",pod=~"{pod}"}})',
'PacketOutgoingVnfIntCp':
'sum(container_network_transmit_packets_total'
'{{interface="{sub_object_instance_id}",pod=~"{pod}"}})',
'ByteIncomingVnfExtCp':
'sum(container_network_receive_bytes_total'
'{{interface="{sub_object_instance_id}",pod=~"{pod}"}})',
'PacketIncomingVnfExtCp':
'sum(container_network_receive_packets_total'
'{{interface="{sub_object_instance_id}",pod=~"{pod}"}})',
'ByteOutgoingVnfExtCp':
'sum(container_network_transmit_bytes_total'
'{{interface="{sub_object_instance_id}",pod=~"{pod}"}})',
'PacketOutgoingVnfExtCp':
'sum(container_network_transmit_packets_total'
'{{interface="{sub_object_instance_id}",pod=~"{pod}"}})',
}
PrometheusPluginPm._instance = self
def set_callback(self, notification_callback):
@ -152,17 +377,6 @@ class PrometheusPluginPm(PrometheusPlugin, mon_base.MonitoringPlugin):
def default_callback(self, context, entries):
self.rpc.store_job_info(context, entries)
def convert_measurement_unit(self, metric, value):
if re.match(r'^V(Cpu|Memory|Disk)Usage(Mean|Peak)Vnf\..+', metric):
value = float(value)
elif re.match(r'^(Byte|Packet)(Incoming|Outgoing)Vnf(IntCp|ExtCp)',
metric):
value = int(value)
else:
raise sol_ex.PrometheusPluginError(
"Failed to convert annotations.value to measurement unit.")
return value
def get_datetime_of_latest_report(
self, context, pm_job, object_instance_id,
sub_object_instance_id, metric):
@ -305,77 +519,7 @@ class PrometheusPluginPm(PrometheusPlugin, mon_base.MonitoringPlugin):
)
return metrics
def make_prom_ql(self, target, pod, collection_period=30,
reporting_period=90, sub_object_instance_id='*'):
reporting_period = max(reporting_period, 30)
expr = self.sol_exprs[target].format(
pod=pod,
collection_period=collection_period,
reporting_period=reporting_period,
sub_object_instance_id=sub_object_instance_id
)
return expr
def make_rule(self, pm_job, object_instance_id, sub_object_instance_id,
metric, expression, collection_period):
labels = {
'alertname': '',
'receiver_type': 'tacker',
'function_type': 'vnfpm',
'job_id': pm_job.id,
'object_instance_id': object_instance_id,
'sub_object_instance_id': sub_object_instance_id,
'metric': metric
}
labels = {k: v for k, v in labels.items() if v is not None}
annotations = {
'value': r'{{$value}}'
}
rule = {
'alert': uuidutils.generate_uuid(),
'expr': expression,
'for': f'{collection_period}s',
'labels': labels,
'annotations': annotations
}
return rule
def get_vnfc_resource_info(self, _, vnf_instance_id, inst_map):
inst = inst_map[vnf_instance_id]
if not inst.obj_attr_is_set('instantiatedVnfInfo') or\
not inst.instantiatedVnfInfo.obj_attr_is_set(
'vnfcResourceInfo'):
return None
return inst.instantiatedVnfInfo.vnfcResourceInfo
def get_pod_regexp(self, resource_info):
# resource ids are like:
# ['test-test1-756757f8f-xcwmt',
# 'test-test2-756757f8f-kmghr', ...]
# convert them to a regex string such as:
# '(test-test1-[0-9a-f]{1,10}-[0-9a-z]{5}$|
# test-test2-[0-9a-f]{1,10}-[0-9a-z]{5}$|...)'
deployments = list(filter(
lambda r:
r.computeResource.obj_attr_is_set(
'vimLevelResourceType')
and r.computeResource.obj_attr_is_set(
'resourceId'
)
and r.computeResource.vimLevelResourceType ==
'Deployment', resource_info
))
deployments = list(set(list(map(
lambda d: re.sub(
r'\-[0-9a-f]{1,10}\-[0-9a-z]{5}$', '',
d.computeResource.resourceId) +
r'-[0-9a-f]{1,10}-[0-9a-z]{5}$',
deployments
))))
pods_regexp = '(' + '|'.join(deployments) + ')'
return deployments, pods_regexp
def _make_rules_for_each_obj(self, context, pm_job, inst_map, metric):
def _make_rules_for_each_obj(self, pm_job, inst_map, metric):
target = re.sub(r'\..+$', '', metric)
objs = pm_job.objectInstanceIds
collection_period = pm_job.criteria.collectionPeriod
@ -388,45 +532,19 @@ class PrometheusPluginPm(PrometheusPlugin, mon_base.MonitoringPlugin):
# convert them to a regex string such as:
# '(test-test1-[0-9a-f]{1,10}-[0-9a-z]{5}$|
# test-test2-[0-9a-f]{1,10}-[0-9a-z]{5}$|...)'
resource_info = self.get_vnfc_resource_info(context, obj, inst_map)
if not resource_info:
continue
deployments, pods_regexp = self.get_pod_regexp(resource_info)
if len(deployments) == 0:
pods_regexp = self.get_pod_regexp(inst_map[obj])
if pods_regexp is None:
continue
namespace = self.get_namespace(inst_map[obj])
expr = self.make_prom_ql(
target, pods_regexp, collection_period=collection_period,
reporting_period=reporting_period)
reporting_period=reporting_period, namespace=namespace)
rules.append(self.make_rule(
pm_job, obj, None, metric, expr,
collection_period))
'PMJob', pm_job.id, obj, None, metric, expr,
collection_period=collection_period))
return rules
def get_compute_resource_by_sub_obj(self, vnf_instance, sub_obj):
inst = vnf_instance
if (not inst.obj_attr_is_set('instantiatedVnfInfo') or
not inst.instantiatedVnfInfo.obj_attr_is_set(
'vnfcResourceInfo') or
not inst.instantiatedVnfInfo.obj_attr_is_set('vnfcInfo')):
return None
vnfc_info = list(filter(
lambda x: (x.obj_attr_is_set('vnfcResourceInfoId') and
x.id == sub_obj),
inst.instantiatedVnfInfo.vnfcInfo))
if len(vnfc_info) == 0:
return None
resources = list(filter(
lambda x: (vnfc_info[0].obj_attr_is_set('vnfcResourceInfoId') and
x.id == vnfc_info[0].vnfcResourceInfoId and
x.computeResource.obj_attr_is_set('vimLevelResourceType') and
x.computeResource.vimLevelResourceType == 'Deployment' and
x.computeResource.obj_attr_is_set('resourceId')),
inst.instantiatedVnfInfo.vnfcResourceInfo))
if len(resources) == 0:
return None
return resources[0].computeResource
def _make_rules_for_each_sub_obj(self, context, pm_job, inst_map, metric):
def _make_rules_for_each_sub_obj(self, pm_job, inst_map, metric):
target = re.sub(r'\..+$', '', metric)
objs = pm_job.objectInstanceIds
sub_objs = pm_job.subObjectInstanceIds\
@ -435,7 +553,7 @@ class PrometheusPluginPm(PrometheusPlugin, mon_base.MonitoringPlugin):
collection_period = pm_job.criteria.collectionPeriod
reporting_period = pm_job.criteria.reportingPeriod
rules = []
resource_info = self.get_vnfc_resource_info(context, objs[0], inst_map)
resource_info = self.get_vnfc_resource_info(inst_map[objs[0]])
if not resource_info:
return []
if pm_job.objectType in {'Vnf', 'Vnfc'}:
@ -446,38 +564,39 @@ class PrometheusPluginPm(PrometheusPlugin, mon_base.MonitoringPlugin):
if not compute_resource:
continue
resource_id = compute_resource.resourceId
namespace = self.get_namespace(inst)
expr = self.make_prom_ql(
target, resource_id,
collection_period=collection_period,
reporting_period=reporting_period)
reporting_period=reporting_period,
namespace=namespace)
rules.append(self.make_rule(
pm_job, objs[0], sub_obj, metric, expr,
collection_period))
'PMJob', pm_job.id, objs[0], sub_obj, metric, expr,
collection_period=collection_period))
else:
deployments, pods_regexp = self.get_pod_regexp(resource_info)
if len(deployments) == 0:
pods_regexp = self.get_pod_regexp(inst_map[objs[0]])
if pods_regexp is None:
return []
for sub_obj in sub_objs:
namespace = self.get_namespace(inst_map[objs[0]])
expr = self.make_prom_ql(
target, pods_regexp, collection_period=collection_period,
reporting_period=reporting_period,
sub_object_instance_id=sub_obj)
sub_object_instance_id=sub_obj, namespace=namespace)
rules.append(self.make_rule(
pm_job, objs[0], sub_obj, metric, expr,
collection_period))
'PMJob', pm_job.id, objs[0], sub_obj, metric, expr,
collection_period=collection_period))
return rules
def _make_rules(self, context, pm_job, metric, inst_map):
def _make_rules(self, pm_job, metric, inst_map):
sub_objs = pm_job.subObjectInstanceIds\
if (pm_job.obj_attr_is_set('subObjectInstanceIds') and
pm_job.subObjectInstanceIds) else []
# Cardinality of objectInstanceIds and subObjectInstanceIds
# is N:0 or 1:N.
if len(sub_objs) > 0:
return self._make_rules_for_each_sub_obj(
context, pm_job, inst_map, metric)
return self._make_rules_for_each_obj(
context, pm_job, inst_map, metric)
return self._make_rules_for_each_sub_obj(pm_job, inst_map, metric)
return self._make_rules_for_each_obj(pm_job, inst_map, metric)
def decompose_metrics_vnfintextcp(self, pm_job):
group_name = 'VnfInternalCp'\
@ -504,32 +623,6 @@ class PrometheusPluginPm(PrometheusPlugin, mon_base.MonitoringPlugin):
)
return metrics
def _delete_rule(self, host, port, user, password, path, pm_job_id):
with paramiko.Transport(sock=(host, port)) as client:
client.connect(username=user, password=password)
sftp = paramiko.SFTPClient.from_transport(client)
sftp.remove(f'{path}/{pm_job_id}.json')
def delete_rules(self, context, pm_job):
target_list, reload_list = self.get_access_info(pm_job)
for target in target_list:
try:
self._delete_rule(
target['host'], target['port'], target['user'],
target['password'], target['path'], pm_job.id)
except (sol_ex.PrometheusPluginError, ks_exc.ClientException,
paramiko.SSHException):
# This exception is ignored. DELETE /pm_jobs/{id}
# will be success even if _delete_rule() is failed.
# Because the rule file was already deleted.
pass
for uri in reload_list:
try:
self.reload_prom_server(context, uri)
except (sol_ex.PrometheusPluginError, ks_exc.ClientException,
paramiko.SSHException):
pass
def decompose_metrics(self, pm_job):
if pm_job.objectType in {'Vnf', 'Vnfc'}:
return self.decompose_metrics_vnfc(pm_job)
@ -538,107 +631,6 @@ class PrometheusPluginPm(PrometheusPlugin, mon_base.MonitoringPlugin):
raise sol_ex.PrometheusPluginError(
f"Invalid objectType: {pm_job.objectType}.")
def reload_prom_server(self, context, reload_uri):
resp, _ = self.client.do_request(
reload_uri, "PUT", context=context)
if resp.status_code >= 400 and resp.status_code < 600:
raise sol_ex.PrometheusPluginError(
f"Reloading request to prometheus is failed: "
f"{resp.status_code}.")
def _upload_rule(self, rule_group, host, port, user, password, path,
pm_job_id):
with tempfile.TemporaryDirectory() as tmpdir:
with open(os.path.join(tmpdir, 'rule.json'),
'w+', encoding="utf-8") as fp:
json.dump(rule_group, fp, indent=4, ensure_ascii=False)
filename = fp.name
with paramiko.Transport(sock=(host, port)) as client:
LOG.info("Upload rule files to prometheus server: %s.", host)
client.connect(username=user, password=password)
sftp = paramiko.SFTPClient.from_transport(client)
sftp.put(filename, f'{path}/{pm_job_id}.json')
self.verify_rule(host, port, user, password, path, pm_job_id)
def verify_rule(self, host, port, user, password, path, pm_job_id):
if not CONF.prometheus_plugin.test_rule_with_promtool:
return
with paramiko.SSHClient() as client:
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host, port=port, username=user, password=password)
command = f"promtool check rules {path}/{pm_job_id}.json"
LOG.info("Rule file validation command: %s", command)
_, stdout, stderr = client.exec_command(command)
if stdout.channel.recv_exit_status() != 0:
error_byte = stderr.read()
error_str = error_byte.decode('utf-8')
LOG.error(
"Rule file validation with promtool failed: %s",
error_str)
raise sol_ex.PrometheusPluginError(
"Rule file validation with promtool failed.")
def get_access_info(self, pm_job):
target_list = []
reload_list = []
if (not pm_job.obj_attr_is_set('metadata')
or 'monitoring' not in pm_job.metadata):
raise sol_ex.PrometheusPluginError(
"monitoring info is missing at metadata field.")
access_info = pm_job.metadata['monitoring']
if (access_info.get('monitorName') != 'prometheus' or
access_info.get('driverType') != 'external'):
raise sol_ex.PrometheusPluginError(
"prometheus info is missing at metadata field.")
for info in access_info.get('targetsInfo', []):
host = info.get('prometheusHost', '')
port = info.get('prometheusHostPort', 22)
auth = info.get('authInfo', {})
user = auth.get('ssh_username', '')
password = auth.get('ssh_password', '')
path = info.get('alertRuleConfigPath', '')
uri = info.get('prometheusReloadApiEndpoint', '')
if not (host and user and path and uri):
continue
target_list.append({
'host': host,
'port': port,
'user': user,
'password': password,
'path': path
})
reload_list.append(uri)
return target_list, list(set(reload_list))
def upload_rules(
self, context, target_list, reload_list, rule_group, pm_job):
def _cleanup_error(target_list):
for target in target_list:
try:
self._delete_rule(target['host'], target['port'],
target['user'], target['password'], target['path'],
pm_job.id)
except (sol_ex.PrometheusPluginError, ks_exc.ClientException,
paramiko.SSHException):
pass
try:
for target in target_list:
self._upload_rule(
rule_group, target['host'], target['port'],
target['user'], target['password'], target['path'],
pm_job.id)
for uri in reload_list:
self.reload_prom_server(context, uri)
except (sol_ex.PrometheusPluginError, ks_exc.ClientException,
paramiko.SSHException) as e:
LOG.error("failed to upload rule files: %s", e.args[0])
_cleanup_error(target_list)
raise e
except Exception as e:
_cleanup_error(target_list)
raise e
def get_vnf_instances(self, context, pm_job):
object_instance_ids = list(set(pm_job.objectInstanceIds))
return dict(zip(
@ -651,7 +643,7 @@ class PrometheusPluginPm(PrometheusPlugin, mon_base.MonitoringPlugin):
target_list, reload_list = self.get_access_info(pm_job)
metrics = self.decompose_metrics(pm_job)
inst_map = self.get_vnf_instances(context, pm_job)
rules = sum([self._make_rules(context, pm_job, metric, inst_map)
rules = sum([self._make_rules(pm_job, metric, inst_map)
for metric in metrics], [])
if len(rules) == 0:
raise sol_ex.PrometheusPluginError(
@ -666,7 +658,7 @@ class PrometheusPluginPm(PrometheusPlugin, mon_base.MonitoringPlugin):
]
}
self.upload_rules(
context, target_list, reload_list, rule_group, pm_job)
context, target_list, reload_list, rule_group, pm_job.id)
return rule_group

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
from oslo_utils import uuidutils
@ -588,6 +589,95 @@ def terminate_vnf_min():
}
def pm_job_external(callback_uri, inst_id, host_ip, rsc_id):
def pm_job(
callback_uri, inst_id, host_ip,
object_type, performance_metric,
sub_object_instance_id=None):
job = {
"objectType": object_type,
"objectInstanceIds": [inst_id],
"subObjectInstanceIds": ([sub_object_instance_id]
if sub_object_instance_id else []),
"criteria": {
"performanceMetric": [performance_metric],
"performanceMetricGroup": [],
"collectionPeriod": 30,
"reportingPeriod": 90
},
"callbackUri": callback_uri,
"metadata": {
"monitoring": {
"monitorName": "prometheus",
"driverType": "external",
"targetsInfo": [
{
"prometheusHost": host_ip,
"prometheusHostPort": 50022,
"authInfo": {
"ssh_username": "root",
"ssh_password": "root"
},
"alertRuleConfigPath":
"/tmp",
"prometheusReloadApiEndpoint":
"http://localhost:9990/-/reload",
}
]
}
}
}
return copy.deepcopy(job)
return [
pm_job(callback_uri, inst_id, host_ip, "Vnf",
f"VCpuUsageMeanVnf.{inst_id}"),
pm_job(callback_uri, inst_id, host_ip, "Vnf",
f"VCpuUsagePeakVnf.{inst_id}"),
pm_job(callback_uri, inst_id, host_ip, "Vnf",
f"VMemoryUsageMeanVnf.{inst_id}"),
pm_job(callback_uri, inst_id, host_ip, "Vnf",
f"VMemoryUsagePeakVnf.{inst_id}"),
pm_job(callback_uri, inst_id, host_ip, "Vnf",
f"VDiskUsageMeanVnf.{inst_id}"),
pm_job(callback_uri, inst_id, host_ip, "Vnf",
f"VDiskUsagePeakVnf.{inst_id}"),
pm_job(callback_uri, inst_id, host_ip, "Vnfc",
f"VCpuUsageMeanVnf.{inst_id}",
sub_object_instance_id=rsc_id),
pm_job(callback_uri, inst_id, host_ip, "Vnfc",
f"VCpuUsagePeakVnf.{inst_id}",
sub_object_instance_id=rsc_id),
pm_job(callback_uri, inst_id, host_ip, "Vnfc",
f"VMemoryUsageMeanVnf.{inst_id}",
sub_object_instance_id=rsc_id),
pm_job(callback_uri, inst_id, host_ip, "Vnfc",
f"VMemoryUsagePeakVnf.{inst_id}",
sub_object_instance_id=rsc_id),
pm_job(callback_uri, inst_id, host_ip, "Vnfc",
f"VDiskUsageMeanVnf.{inst_id}",
sub_object_instance_id=rsc_id),
pm_job(callback_uri, inst_id, host_ip, "Vnfc",
f"VDiskUsagePeakVnf.{inst_id}",
sub_object_instance_id=rsc_id),
pm_job(callback_uri, inst_id, host_ip, "VnfIntCp",
"ByteIncomingVnfIntCp", sub_object_instance_id="eth0"),
pm_job(callback_uri, inst_id, host_ip, "VnfIntCp",
"PacketIncomingVnfIntCp", sub_object_instance_id="eth0"),
pm_job(callback_uri, inst_id, host_ip, "VnfIntCp",
"ByteOutgoingVnfIntCp", sub_object_instance_id="eth0"),
pm_job(callback_uri, inst_id, host_ip, "VnfIntCp",
"PacketOutgoingVnfIntCp", sub_object_instance_id="eth0"),
pm_job(callback_uri, inst_id, host_ip, "VnfExtCp",
"ByteIncomingVnfExtCp", sub_object_instance_id="eth1"),
pm_job(callback_uri, inst_id, host_ip, "VnfExtCp",
"PacketIncomingVnfExtCp", sub_object_instance_id="eth1"),
pm_job(callback_uri, inst_id, host_ip, "VnfExtCp",
"ByteOutgoingVnfExtCp", sub_object_instance_id="eth1"),
pm_job(callback_uri, inst_id, host_ip, "VnfExtCp",
"PacketOutgoingVnfExtCp", sub_object_instance_id="eth1")
]
def pm_job_min(callback_uri, inst_id, host_ip):
return {
"objectType": "Vnf",

View File

@ -0,0 +1,63 @@
FROM python:3.8.13
## setup and run
# [usage:]
# docker build -t tacker-monitoring-test .
# docker run -v ${PWD}/src:/work/src -v ${PWD}/rules:/etc/prometheus/rules -p 55555:55555 -p 50022:22 -e TEST_REMOTE_URI="http://<nfvo_addr>:<port>" -it tacker-monitoring-test
#
# (under proxy environment)
# sudo docker build --build-arg PROXY=$http_proxy -t tacker-monitoring-test .
# docker run -v ${PWD}/src:/work/src -v ${PWD}/rules:/etc/prometheus/rules -p 55555:55555 -p 50022:22 -e TEST_REMOTE_URI="http://<nfvo_addr>:<port>" -it tacker-monitoring-test
#
# [api:]
# curl -X POST http://<<this_tool's_url>>:55555/v2/tenant_id/servers/server_id/alarms -d '{"fault_action": "http://<<tacker_uri>>", "fault_id": "2222"}' -i
# curl -X DELETE http://<<this_tool's_url>>:55555/v2/tenant_id/servers/server_id/alarms/<<alarm_id>> -i
ARG PROXY
ENV http_proxy ${PROXY}
ENV https_proxy ${PROXY}
ENV HTTP_PROXY ${PROXY}
ENV HTTPS_PROXY ${PROXY}
USER root
RUN useradd -m user
RUN if [ ! -z "${MS_UID}" -a "${MS_UID}" -ge 1000 ] ;\
then usermod -u ${MS_UID} user ;\
else usermod -u 1000 user ; \
fi
# SSH server
RUN apt-get update && \
apt-get install -y --no-install-recommends openssh-server && \
rm -rf /var/lib/apt/lists/* && \
echo "root:root" | chpasswd && \
sed -i "s/#PermitRootLogin prohibit-password/PermitRootLogin yes/" /etc/ssh/sshd_config
RUN pip install --upgrade pip
COPY requirements.txt /tmp/requirements.txt
RUN pip install --default-timeout=1000 --no-cache-dir -r /tmp/requirements.txt
COPY entrypoint.sh /tmp/entrypoint.sh
RUN mkdir -p /work/src && chmod 777 /work/src
RUN mkdir -p /etc/prometheus/rules && chmod 777 /etc/prometheus/rules
# prometheus & promtool
ARG PROM_VERSION="2.37.5"
RUN cd /tmp && \
wget -q https://github.com/prometheus/prometheus/releases/download/v${PROM_VERSION}/prometheus-${PROM_VERSION}.linux-amd64.tar.gz && \
tar zxf /tmp/prometheus-${PROM_VERSION}.linux-amd64.tar.gz -C /usr/local/src/&& \
ln -s /usr/local/src/prometheus-${PROM_VERSION}.linux-amd64/prometheus /usr/bin/prometheus && \
ln -s /usr/local/src/prometheus-${PROM_VERSION}.linux-amd64/promtool /usr/bin/promtool
ENV http_proxy ''
ENV https_proxy ''
ENV HTTP_PROXY ''
ENV HTTPS_PROXY ''
EXPOSE 55555
EXPOSE 22
#USER user
WORKDIR /work
RUN chown "user:user" /tmp/entrypoint.sh
RUN chmod +x /tmp/entrypoint.sh
CMD [ "/tmp/entrypoint.sh" ]

View File

@ -0,0 +1,5 @@
#!/bin/bash
service ssh start
python3 /work/src/testserver.py

View File

@ -0,0 +1,256 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import json
import os
import threading
import urllib
import urllib.request
import uuid
from datetime import datetime
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler
from http.server import HTTPServer
PORT = 55555
PROM_RULE_DIR = '/etc/prometheus/rules'
server_notification_alarm_map = {}
_body_base = {
'receiver': 'receiver',
'status': 'firing',
'alerts': [
],
'groupLabels': {},
'commonLabels': {
'alertname': 'NodeInstanceDown',
'job': 'node'
},
'commonAnnotations': {
'description': 'sample'
},
'externalURL': 'http://controller147:9093',
'version': '4',
'groupKey': '{}:{}',
'truncatedAlerts': 0
}
class PeriodicTask():
def __init__(self):
self.remote_url = os.getenv('TEST_REMOTE_URI')
print(f"url: {str(self.remote_url)}")
self.schedule_next()
self.stored_alerts_fm = {}
def schedule_next(self):
self.timer = threading.Timer(10, self.run)
self.timer.start()
def server_notification_task(self):
print("server_notification_task: num of items: %s" %
str(len(server_notification_alarm_map.keys())))
for v in server_notification_alarm_map.values():
try:
if ('fault_action' in v and 'fault_id' in v):
url = v['fault_action']
body = {
'notification': {
'alarm_id': v['alarm_id'],
'fault_id': v['fault_id'],
'fault_type': '10'
}
}
headers = {
'Content-Type': 'application/json',
}
req = urllib.request.Request(
url, json.dumps(body).encode('utf-8'), headers)
with urllib.request.urlopen(req) as res:
print(f"res status: {str(res.status)}")
except Exception as ex:
print(str(ex))
def _prometheus_plugin_task(self, grp, filename):
alerts_pm = []
alerts_fm = []
alerts_auto_scale = []
use_stored_alerts = False
if filename in self.stored_alerts_fm:
print("use_stored_alerts")
stored_alerts = self.stored_alerts_fm[filename]
for a in stored_alerts:
a['status'] = 'resolved'
del self.stored_alerts_fm[filename]
alerts_fm = stored_alerts
use_stored_alerts = True
for rule in grp['rules']:
if 'labels' not in rule or 'function_type' not in rule['labels']:
continue
alt = {
'status': 'firing',
'labels': rule['labels'],
'annotations': {'value': 99},
'startsAt': datetime.now().isoformat(),
'fingerprint': str(uuid.uuid4())
}
if rule['labels']['function_type'] == 'vnfpm':
alt['annotations'] = {'value': 99}
alerts_pm.append(alt)
if (not use_stored_alerts and
rule['labels']['function_type'] == 'vnffm'):
alt['annotations'] = {
'fault_type': 'fault_type',
'probable_cause': 'probable_cause'}
alerts_fm.append(alt)
if rule['labels']['function_type'] == 'auto_scale':
alt['annotations'] = {}
alerts_auto_scale.append(alt)
if not use_stored_alerts and len(alerts_fm) > 0:
self.stored_alerts_fm[filename] = alerts_fm
return (alerts_pm, alerts_fm, alerts_auto_scale)
def prometheus_plugin_task(self):
print(f"prometheus_plugin_task: {PROM_RULE_DIR}")
for entry in os.scandir(path=PROM_RULE_DIR):
if not entry.is_file():
continue
print(f"file: {entry.name}")
try:
with open(PROM_RULE_DIR + '/' + entry.name) as f:
rules = json.load(f)
if 'groups' not in rules:
continue
for grp in rules['groups']:
if 'rules' not in grp:
continue
pm, fm, scale = self._prometheus_plugin_task(
grp, entry.name)
for x in [(pm, '/pm_event'), (fm, '/alert'),
(scale, '/alert/vnf_instances')]:
if len(x[0]) == 0:
continue
body = copy.deepcopy(_body_base)
body['alerts'] = x[0]
headers = {'Content-Type': 'application/json'}
url = self.remote_url + x[1]
req = urllib.request.Request(
url, json.dumps(body).encode('utf-8'),
headers, method='POST')
print(f"uri: {str(url)}")
print(f"body: {str(body)}")
with urllib.request.urlopen(req) as res:
print(f"res status: {str(res.status)}")
except Exception as ex:
print(str(ex))
def run(self):
print("PeriodicTask run()")
self.server_notification_task()
self.prometheus_plugin_task()
self.schedule_next()
class TestHttpServer(BaseHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def send_response(self, response_code, response_body):
super().send_response(response_code)
self.send_header("Content-type", "application/json")
self.send_header("Content-Length", str(len(response_body)))
self.end_headers()
self.wfile.write(response_body)
def do_GET(self):
print(f"GET {self.path}")
response_body = '{"result": "ok"}\n'.encode('utf-8')
self.send_response(HTTPStatus.NO_CONTENT, response_body)
def do_DELETE_server_notification(self, alarm_id):
if alarm_id in server_notification_alarm_map:
del server_notification_alarm_map[alarm_id]
self.send_response(HTTPStatus.NO_CONTENT, b'')
def do_DELETE(self):
print(f"DELETE {self.path}")
parsed = urllib.parse.urlparse(self.path)
path_detail = parsed.path.split('/')
if (len(path_detail) == 7 and
path_detail[1] == 'v2' and
path_detail[3] == 'servers' and
path_detail[5] == 'alarms'):
self.do_DELETE_server_notification(path_detail[6])
else:
self.send_response(HTTPStatus.NO_CONTENT, b'')
def do_POST_server_notification(self, decoded_data, tenant_id, server_id):
print("POST_server_notification")
try:
print(json.dumps(json.loads(decoded_data),
indent=2, ensure_ascii=False))
print(str(uuid.uuid4()))
_id = str(uuid.uuid4())
data = json.loads(decoded_data)
data['alarm_id'] = _id
server_notification_alarm_map[_id] = data
response_data = json.dumps(data) + '\n'
response = HTTPStatus.CREATED, response_data.encode('utf-8')
except Exception:
response = (
HTTPStatus.BAD_REQUEST, '{"result": "ng"}\n'.encode('utf-8'))
self.send_response(response[0], response[1])
def do_POST(self):
content_length = int(self.headers.get('content-length'))
raw_data = self.rfile.read(content_length)
decoded_data = raw_data.decode('utf-8')
print(f"POST {self.path}")
parsed = urllib.parse.urlparse(self.path)
path_detail = parsed.path.split('/')
if (len(path_detail) == 6 and
path_detail[1] == 'v2' and
path_detail[3] == 'servers' and
path_detail[5] == 'alarms'):
self.do_POST_server_notification(
decoded_data, path_detail[2], path_detail[4])
else:
try:
print(json.dumps(json.loads(decoded_data),
indent=2, ensure_ascii=False))
response = (HTTPStatus.NO_CONTENT,
'{"result": "ok"}\n'.encode('utf-8'))
except Exception:
response = (HTTPStatus.BAD_REQUEST,
'{"result": "ng"}\n'.encode('utf-8'))
self.send_response(response[0], response[1])
periodic_task = PeriodicTask()
server = HTTPServer(('0.0.0.0', PORT), TestHttpServer)
server.serve_forever()

View File

@ -343,3 +343,94 @@ class VnfPmTest(base_v2.BaseVnfLcmKubernetesV2Test):
# check deletion of VNF instance
resp, body = self.show_vnf_instance(inst_id)
self.assertEqual(404, resp.status_code)
def test_prometheus_plugin_external_data(self):
"""Test PM operations with all performanceMetric
* About PM operations:
This test includes the following operations.
- 1. Create a new VNF instance resource
- 2. Instantiate a VNF instance
(loop for each performanceMetric)
- 3. PMJob-Create
- 4. PMJob-Delete
- 5. Terminate a VNF instance
- 6. Delete a VNF instance
"""
# 1. LCM-Create: Create a new VNF instance resource
create_req = paramgen.pm_instantiate_cnf_resources_create(
self.vnfd_id_1)
resp, body = self.create_vnf_instance(create_req)
self.assertEqual(201, resp.status_code)
inst_id = body['id']
# 2. LCM-Instantiate: Instantiate a VNF instance
vim_id = self.get_k8s_vim_id()
instantiate_req = paramgen.min_sample_instantiate(vim_id)
instantiate_req['additionalParams'][
'lcm-kubernetes-def-files'] = ['Files/kubernetes/deployment.yaml']
resp, body = self.instantiate_vnf_instance(inst_id, instantiate_req)
self.assertEqual(202, resp.status_code)
lcmocc_id = os.path.basename(resp.headers['Location'])
self.wait_lcmocc_complete(lcmocc_id)
resp, body = self.show_vnf_instance(inst_id)
self.assertEqual(200, resp.status_code)
rsc = body['instantiatedVnfInfo']['vnfcInfo'][0]['id']
pm_expected_attrs = [
'id',
'objectType',
'objectInstanceIds',
'criteria',
'callbackUri',
'_links'
]
callback_url = os.path.join(base_v2.MOCK_NOTIFY_CALLBACK_URL,
self._testMethodName)
callback_uri = ('http://localhost:'
f'{base_v2.FAKE_SERVER_MANAGER.SERVER_PORT}'
f'{callback_url}')
pm_job_list = paramgen.pm_job_external(
callback_uri, inst_id, self.fake_prometheus_ip, rsc)
for job in pm_job_list:
print(f"{job['criteria']['performanceMetric'][0]}")
# 3. PMJob-Create
resp, body = self.create_pm_job(job)
self.assertEqual(201, resp.status_code)
self.check_resp_headers_in_create(resp)
self.check_resp_body(body, pm_expected_attrs)
pm_job_id = body.get('id')
# 4. PMJob-Delete
resp, body = self.delete_pm_job(pm_job_id)
self.assertEqual(204, resp.status_code)
self.check_resp_headers_in_delete(resp)
# 5. LCM-Terminate: Terminate VNF
terminate_req = paramgen.terminate_vnf_min()
resp, body = self.terminate_vnf_instance(inst_id, terminate_req)
self.assertEqual(202, resp.status_code)
lcmocc_id = os.path.basename(resp.headers['Location'])
self.wait_lcmocc_complete(lcmocc_id)
# wait a bit because there is a bit time lag between lcmocc DB
# update and terminate completion.
time.sleep(10)
# check instantiationState of VNF
resp, body = self.show_vnf_instance(inst_id)
self.assertEqual(200, resp.status_code)
self.assertEqual(fields.VnfInstanceState.NOT_INSTANTIATED,
body['instantiationState'])
# 6. LCM-Delete: Delete a VNF instance
resp, body = self.delete_vnf_instance(inst_id)
self.assertEqual(204, resp.status_code)
# check deletion of VNF instance
resp, body = self.show_vnf_instance(inst_id)
self.assertEqual(404, resp.status_code)

View File

@ -20,6 +20,7 @@ import paramiko
import sys
import webob
from tacker.common import utils
from tacker import context
from tacker.sol_refactored.common import exceptions as sol_ex
from tacker.sol_refactored.common import http_client
@ -89,13 +90,13 @@ _body_pm_alert5['labels']['metric'] = 'ByteIncomingVnfIntCp'
_body_pm_alert6 = copy.deepcopy(_body_pm_alert1)
_body_pm_alert6['labels']['metric'] = 'InvalidMetric'
_body_pm1 = copy.copy(_body_base)
_body_pm1 = copy.deepcopy(_body_base)
_body_pm1.update({
'alerts': [
_body_pm_alert1, _body_pm_alert2, _body_pm_alert3, _body_pm_alert4]
})
_body_pm2 = copy.copy(_body_base)
_body_pm2 = copy.deepcopy(_body_base)
_body_pm2.update({
'alerts': [_body_pm_alert5, _body_pm_alert6]
})
@ -236,7 +237,7 @@ _inst_base = {
'instantiationState': 'NOT_INSTANTIATED',
}
_inst1 = copy.copy(_inst_base)
_inst1 = copy.deepcopy(_inst_base)
_inst1.update({
'instantiatedVnfInfo': {
'id': 'id',
@ -261,6 +262,11 @@ _inst1.update({
}
})
_inst2 = copy.deepcopy(_inst1)
_inst2['instantiatedVnfInfo']['metadata'] = {
'namespace': 'test'
}
datetime_test = datetime.datetime.fromisoformat(
'2022-06-22T01:23:45.678Z'.replace('Z', '+00:00'))
@ -273,12 +279,32 @@ def unload_uuidsentinel():
class _ParamikoTest():
def __init__(self):
channel = None
exp = None
recv_exit_status_value = 0
def __init__(self, exp=None, recv_exit_status_value=0):
self.channel = self
self.exp = exp
self.recv_exit_status_value = recv_exit_status_value
pass
def connect(self, **kwargs):
def set_missing_host_key_policy(self, arg1):
pass
def exec_command(self, *args):
return None, self, self
def recv_exit_status(self):
return self.recv_exit_status_value
def read(self):
return b'test'
def connect(self, *args, **kwargs):
if self.exp:
raise self.exp
def remove(self, arg1):
pass
@ -469,15 +495,23 @@ class TestPrometheusPluginPm(base.TestCase):
# error
resp.status_code = 503
pp.delete_job(context=self.context, pm_job=job)
# paramiko error
resp.status_code = 202
mock_paramiko.return_value = _ParamikoTest(
exp=sol_ex.PrometheusPluginError())
pp.delete_job(context=self.context, pm_job=job)
@mock.patch.object(paramiko, 'SSHClient')
@mock.patch.object(http_client.HttpClient, 'do_request')
@mock.patch.object(paramiko.SFTPClient, 'from_transport')
@mock.patch.object(paramiko, 'Transport')
@mock.patch.object(inst_utils, 'get_inst')
def test_create_job(
self, mock_inst, mock_paramiko, mock_sftp, mock_do_request):
self, mock_inst, mock_paramiko, mock_sftp, mock_do_request,
mock_sshclient):
mock_paramiko.return_value = _ParamikoTest()
mock_sftp.return_value = _ParamikoTest()
mock_sshclient.return_value = _ParamikoTest()
resp = webob.Response()
resp.status_code = 202
mock_do_request.return_value = resp, {}
@ -485,6 +519,8 @@ class TestPrometheusPluginPm(base.TestCase):
self.config_fixture.config(
group='prometheus_plugin', performance_management=True)
self.config_fixture.config(
group='prometheus_plugin', test_rule_with_promtool=True)
pp = mon_base.MonitoringPlugin.get_instance(
prometheus_plugin.PrometheusPluginPm)
# VirtualisedComputeResource
@ -496,6 +532,15 @@ class TestPrometheusPluginPm(base.TestCase):
rule = pp.create_job(context=self.context, pm_job=job)
self.assertTrue(len(rule['groups'][0]['rules']) > 0)
self.assertTrue('interface="*"' in str(rule))
# namespace
job = objects.PmJobV2.from_dict(_pm_job)
rule = pp.create_job(context=self.context, pm_job=job)
self.assertTrue('namespace="default"' in str(rule))
self.assertFalse('namespace="test"' in str(rule))
mock_inst.return_value = objects.VnfInstanceV2.from_dict(_inst2)
rule = pp.create_job(context=self.context, pm_job=job)
self.assertFalse('namespace="default"' in str(rule))
self.assertTrue('namespace="test"' in str(rule))
@mock.patch.object(http_client.HttpClient, 'do_request')
@mock.patch.object(paramiko.SFTPClient, 'from_transport')
@ -663,6 +708,111 @@ class TestPrometheusPluginPm(base.TestCase):
pp.create_job, context=self.context, pm_job=job
)
@mock.patch.object(paramiko, 'SSHClient')
@mock.patch.object(http_client.HttpClient, 'do_request')
@mock.patch.object(paramiko.SFTPClient, 'from_transport')
@mock.patch.object(paramiko, 'Transport')
@mock.patch.object(inst_utils, 'get_inst')
def test_create_job_uploading_error(
self, mock_inst, mock_paramiko, mock_sftp, mock_do_request,
mock_sshclient):
mock_paramiko.return_value = _ParamikoTest()
mock_sftp.return_value = _ParamikoTest()
exp = ValueError("test_create_job_error2")
mock_sshclient.return_value = _ParamikoTest(
exp=exp, recv_exit_status_value=1)
resp = webob.Response()
resp.status_code = 202
mock_do_request.return_value = resp, {}
mock_inst.return_value = objects.VnfInstanceV2.from_dict(_inst1)
self.config_fixture.config(
group='prometheus_plugin', performance_management=True)
self.config_fixture.config(
group='prometheus_plugin', test_rule_with_promtool=True)
pp = mon_base.MonitoringPlugin.get_instance(
prometheus_plugin.PrometheusPluginPm)
# upload error
job = objects.PmJobV2.from_dict(_pm_job)
self.assertRaises(
ValueError,
pp.create_job, context=self.context, pm_job=job)
exp = sol_ex.PrometheusPluginError("test_create_job_error2")
mock_paramiko.return_value = _ParamikoTest(exp=exp)
mock_sshclient.return_value = _ParamikoTest(
exp=exp, recv_exit_status_value=1)
self.assertRaises(
sol_ex.PrometheusPluginError,
pp.create_job, context=self.context, pm_job=job)
@mock.patch.object(utils, 'find_config_file')
@mock.patch.object(paramiko, 'SSHClient')
@mock.patch.object(http_client.HttpClient, 'do_request')
@mock.patch.object(paramiko.SFTPClient, 'from_transport')
@mock.patch.object(paramiko, 'Transport')
@mock.patch.object(inst_utils, 'get_inst')
def test_promql_config_file_missing(
self, mock_inst, mock_paramiko, mock_sftp, mock_do_request,
mock_sshclient, mock_utils):
mock_paramiko.return_value = _ParamikoTest()
mock_sftp.return_value = _ParamikoTest()
mock_sshclient.return_value = _ParamikoTest(recv_exit_status_value=1)
resp = webob.Response()
resp.status_code = 202
mock_do_request.return_value = resp, {}
mock_inst.return_value = objects.VnfInstanceV2.from_dict(_inst1)
self.config_fixture.config(
group='prometheus_plugin', performance_management=True)
self.config_fixture.config(
group='prometheus_plugin', test_rule_with_promtool=True)
pp = mon_base.MonitoringPlugin.get_instance(
prometheus_plugin.PrometheusPluginPm)
# no config file
mock_utils.return_value = None
job = objects.PmJobV2.from_dict(_pm_job)
self.assertRaises(
sol_ex.PrometheusPluginError,
pp.create_job, context=self.context, pm_job=job
)
# Type check
mock_utils.return_value = None
pp.make_rule("Threshold", "id", "id", "id", "metric", "exp")
self.assertRaises(
sol_ex.PrometheusPluginError,
pp.make_rule, "TypeError", "id", "id", "id", "metric", "exp"
)
@mock.patch.object(paramiko, 'SSHClient')
@mock.patch.object(http_client.HttpClient, 'do_request')
@mock.patch.object(paramiko.SFTPClient, 'from_transport')
@mock.patch.object(paramiko, 'Transport')
@mock.patch.object(inst_utils, 'get_inst')
def test_promql(
self, mock_inst, mock_paramiko, mock_sftp, mock_do_request,
mock_sshclient):
mock_paramiko.return_value = _ParamikoTest()
mock_sftp.return_value = _ParamikoTest()
mock_sshclient.return_value = _ParamikoTest(recv_exit_status_value=1)
resp = webob.Response()
resp.status_code = 202
mock_do_request.return_value = resp, {}
mock_inst.return_value = objects.VnfInstanceV2.from_dict(_inst1)
self.config_fixture.config(
group='prometheus_plugin', performance_management=True)
self.config_fixture.config(
group='prometheus_plugin', test_rule_with_promtool=True)
pp = mon_base.MonitoringPlugin.get_instance(
prometheus_plugin.PrometheusPluginPm)
job = objects.PmJobV2.from_dict(_pm_job)
self.assertRaises(
sol_ex.PrometheusPluginError,
pp.create_job, context=self.context, pm_job=job
)
class TestPrometheusPluginFm(base.TestCase):
def setUp(self):

View File

@ -4,9 +4,7 @@
# for functional testing.
#
cd /opt/stack/tacker/tacker/tests/functional/sol_kubernetes_v2/samples/
sudo unzip tacker-monitoring-test.zip
cd ./tacker-monitoring-test
cd /opt/stack/tacker/tacker/tests/functional/sol_kubernetes_v2/samples/tacker-monitoring-test
sudo docker build -t tacker-monitoring-test .
sudo docker run -v ${PWD}/src:/work/src -v ${PWD}/rules:/etc/prometheus/rules \