mistral-extra/mistral_extra/monitoring/monitoring_server.py
Vasudeo Nimbekar 3385694dd5 Add monitoring server to mistral-extra
This feature will allow user to monitor mistral services and important metrics like operations related to tasks, executions, workflows, etc.
1. Added Monitoring server which can run similar to other mistral components by enabling monitoring and recovery jobs config options.
2. Monitoring plugin can collect metrics from the mistral and publish it in the Prometheus compatible format.
3. By using this monitoring plugin user can attach monitoring tools to the Mistral like Prometheus and Grafana to view metrics in a dashboard.
4. Added recovery jobs to recover from different failing or stucked conditions for eg. removing frozen named locks, refresh state of tasks stucked in waiting state long time, expired sub workflow tasks, stucked workflows, etc.

Implements: blueprint add-mistral-monitoring-plugin

Change-Id: Idbb6de9084504448befb9e346da4f458eb6c5a17
2023-05-26 14:39:34 +05:30

114 lines
3.6 KiB
Python

# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from flask import Flask
from flask import jsonify
from flask import Response
from importlib_metadata import entry_points
from mistral.service import base as service_base
from mistral_extra.monitoring.prometheus import format_to_prometheus
from oslo_config import cfg
from oslo_log import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def get_oslo_service(setup_profiler=True):
return MonitoringServer(
setup_profiler=setup_profiler
)
class MonitoringServer(service_base.MistralService):
def __init__(self, setup_profiler=True):
super(MonitoringServer, self).__init__(
'monitoring_group',
setup_profiler
)
collectors = entry_points(group='monitoring.metric_collector')
self._metric_collectors = [collector.load()()
for collector in collectors]
self._jobs = []
self._standard_tags = {}
self._metrics = {}
self._prometheus_formatted_metrics = []
self._last_updated = None
self._timedelta = datetime.timedelta(
seconds=CONF.monitoring.metric_collection_interval
)
self.app = Flask(__name__)
self.app.add_url_rule('/metrics', 'metrics', self.metrics)
self.app.add_url_rule('/health', 'health', self.health)
def collect_metrics(self, to_json=False):
now = datetime.datetime.now()
if not self._last_updated or self._outdated(now):
metrics = []
for collector in self._metric_collectors:
metrics.extend(collector.collect())
for metric in metrics:
metric.tags.update(self._standard_tags)
self._metrics = metrics
self._last_updated = now
if to_json:
return list(map(lambda x: x.__dict__, self._metrics))
return self._metrics
def _outdated(self, now):
return self._last_updated <= now - self._timedelta
def _get_prometheus_metrics(self):
metrics = self.collect_metrics(to_json=True)
pr_metrics = format_to_prometheus(metrics)
return ''.join([line.decode('utf-8') for line in pr_metrics])
def metrics(self):
with self.app.app_context():
m = self._get_prometheus_metrics()
return Response(m, 200, content_type='text/plain')
def health(self):
with self.app.app_context():
return jsonify({'status': 'UP'})
def _init_monitoring_jobs(self):
if CONF.recovery_job.enabled:
recovery_jobs = entry_points(group='monitoring.recovery_jobs')
for job in recovery_jobs:
recovery_job = job.load()()
self._jobs.append(recovery_job)
recovery_job.start()
def start(self):
super(MonitoringServer, self).start()
self._init_monitoring_jobs()
self.app.run(host="0.0.0.0", port=9090)
def stop(self, graceful=False):
super(MonitoringServer, self).stop()
for job in self._jobs:
job.stop(graceful)