347 lines
13 KiB
Python
Executable File
347 lines
13 KiB
Python
Executable File
# Copyright 2015 Carnegie Mellon University
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from oslo_config import cfg
|
|
from oslo_log import log
|
|
from stevedore import driver
|
|
import threading
|
|
import time
|
|
|
|
from kiloeyes.common import es_conn
|
|
from kiloeyes.common import kafka_conn
|
|
from kiloeyes.common import namespace
|
|
from kiloeyes.openstack.common import service as os_service
|
|
|
|
lock = threading.RLock()
|
|
|
|
THRESHOLD_ENGINE_OPTS = [
|
|
cfg.StrOpt('metrics_topic',
|
|
default='metrics',
|
|
help='topics to read metrics'),
|
|
cfg.StrOpt('alarm_topic',
|
|
default='alarm',
|
|
help='topic to send alarms'),
|
|
cfg.StrOpt('processor',
|
|
default='thresholding_processor',
|
|
help='the thresh processor'),
|
|
cfg.IntOpt('check_alarm_interval',
|
|
default=60)
|
|
]
|
|
ALARM_DEFINITION_OPTS = [
|
|
cfg.StrOpt('doc_type', default='alarmdefinitions',
|
|
help='The doc_type that alarm definitions will be saved to.'),
|
|
cfg.StrOpt('index_strategy', default='fixed',
|
|
help='The index strategy used to create index name.'),
|
|
cfg.StrOpt('index_prefix', default='data_',
|
|
help='The index prefix where metrics were saved to.'),
|
|
cfg.IntOpt('size', default=1000,
|
|
help=('The query result limit. Any result set more than '
|
|
'the limit will be discarded.')),
|
|
cfg.StrOpt('name', default='',
|
|
help='The name for query alarm definitions.'),
|
|
cfg.StrOpt('dimensions', default='',
|
|
help='The dimensions for query alarm definitions.'),
|
|
cfg.IntOpt('check_alarm_def_interval',
|
|
default=120)
|
|
]
|
|
|
|
|
|
cfg.CONF.register_opts(THRESHOLD_ENGINE_OPTS, group="thresholdengine")
|
|
cfg.CONF.register_opts(ALARM_DEFINITION_OPTS, group="alarmdefinitions")
|
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
|
|
class AlarmPublisher(threading.Thread):
|
|
"""The thread to publish alarm messages.
|
|
|
|
This class will periodically call processors to get alarms produced,
|
|
and send them into kafka
|
|
"""
|
|
def __init__(self, t_name, tp):
|
|
threading.Thread.__init__(self, name=t_name)
|
|
# init kafka connection to alarm topic
|
|
self._publish_kafka_conn = None
|
|
topic = cfg.CONF.thresholdengine.alarm_topic
|
|
self._publish_kafka_conn = (
|
|
kafka_conn.KafkaConnection(topic))
|
|
# set time interval for calling processors to refresh alarms
|
|
self.interval = cfg.CONF.thresholdengine.check_alarm_interval
|
|
self.threshold_processors = tp
|
|
|
|
def send_alarm(self):
|
|
if self._publish_kafka_conn:
|
|
if lock.acquire():
|
|
for aid in self.threshold_processors:
|
|
# get alarms produced by each processor
|
|
for alarm in (self.threshold_processors
|
|
[aid]['processor'].process_alarms()):
|
|
LOG.debug(alarm)
|
|
self._publish_kafka_conn.send_messages(alarm)
|
|
lock.release()
|
|
|
|
def run(self):
|
|
while True:
|
|
try:
|
|
self.send_alarm()
|
|
time.sleep(self.interval)
|
|
except Exception:
|
|
LOG.exception(
|
|
'Error occurred while publishing alarm messages.')
|
|
|
|
def stop(self):
|
|
self._publish_kafka_conn.close()
|
|
|
|
|
|
class MetricsConsumer(threading.Thread):
|
|
"""The thread to read metrics.
|
|
|
|
This class will get metrics messages from kafka,
|
|
and deliver them to processors.
|
|
"""
|
|
def __init__(self, t_name, tp):
|
|
threading.Thread.__init__(self, name=t_name)
|
|
# init kafka connection to metrics topic
|
|
self._consume_kafka_conn = None
|
|
topic = cfg.CONF.thresholdengine.metrics_topic
|
|
self._consume_kafka_conn = kafka_conn.KafkaConnection(topic)
|
|
self.threshold_processors = tp
|
|
|
|
def read_metrics(self):
|
|
def consume_metrics():
|
|
if lock.acquire():
|
|
# send metrics to each processor
|
|
for aid in self.threshold_processors:
|
|
processor = self.threshold_processors[aid]['processor']
|
|
processor.process_metrics(msg.message.value)
|
|
lock.release()
|
|
|
|
if self._consume_kafka_conn:
|
|
for msg in self._consume_kafka_conn.get_messages():
|
|
if msg and msg.message:
|
|
LOG.debug(msg.message.value)
|
|
consume_metrics()
|
|
self._consume_kafka_conn.commit()
|
|
|
|
def run(self):
|
|
while True:
|
|
try:
|
|
self.read_metrics()
|
|
except Exception:
|
|
LOG.exception('Error occurred while reading metrics messages.')
|
|
|
|
def stop(self):
|
|
self._consume_kafka_conn.close()
|
|
|
|
|
|
class AlarmDefinitionConsumer(threading.Thread):
|
|
"""The thread to process alarm definitions.
|
|
|
|
This class will get alarm definition messages from kafka,
|
|
Then init new processor, update existing processor or delete processor
|
|
according to the request.
|
|
"""
|
|
def __init__(self, t_name, tp):
|
|
threading.Thread.__init__(self, name=t_name)
|
|
|
|
self.doc_type = cfg.CONF.alarmdefinitions.doc_type
|
|
self.size = cfg.CONF.alarmdefinitions.size
|
|
# load index strategy
|
|
if cfg.CONF.alarmdefinitions.index_strategy:
|
|
self.index_strategy = driver.DriverManager(
|
|
namespace.STRATEGY_NS,
|
|
cfg.CONF.alarmdefinitions.index_strategy,
|
|
invoke_on_load=True,
|
|
invoke_kwds={}).driver
|
|
LOG.debug(self.index_strategy)
|
|
else:
|
|
self.index_strategy = None
|
|
self.index_prefix = cfg.CONF.alarmdefinitions.index_prefix
|
|
self._es_conn = es_conn.ESConnection(
|
|
self.doc_type, self.index_strategy, self.index_prefix)
|
|
# get the dict where all processors are indexed
|
|
self.threshold_processors = tp
|
|
# get the time interval to query es
|
|
self.interval = cfg.CONF.alarmdefinitions.check_alarm_def_interval
|
|
# set the flag, which is used to determine if a processor is expired
|
|
# each processor will has its flag, if same with self.flag, it's valid
|
|
self.flag = 0
|
|
# setup query params
|
|
self.params = self._build_alarm_definitions_query(
|
|
cfg.CONF.alarmdefinitions.name,
|
|
cfg.CONF.alarmdefinitions.dimensions)
|
|
|
|
def _build_alarm_definitions_query(self, name, dimensions):
|
|
query = {}
|
|
queries = []
|
|
field_string = 'alarmdefinitions.expression_data.dimensions.'
|
|
if dimensions:
|
|
# add dimensions to query params
|
|
current_dimension_split = (
|
|
dimensions.split(','))
|
|
for current_dimension in current_dimension_split:
|
|
current_dimen_data = current_dimension.split(':')
|
|
queries.append({'query_string': {
|
|
'default_field': (field_string +
|
|
current_dimen_data[0]),
|
|
'query': current_dimen_data[1]}})
|
|
if name:
|
|
# add name to query params
|
|
queries.append({'query_string': {
|
|
'default_field': 'name', 'query': name}})
|
|
# finally build the query
|
|
query = {'query': {'bool': {'must': queries}}}
|
|
return query
|
|
|
|
def _get_alarm_definitions_response(self, res):
|
|
if res and res.status_code == 200:
|
|
obj = res.json()
|
|
if obj:
|
|
return obj.get('hits')
|
|
return None
|
|
|
|
def get_alarm_definitions(self):
|
|
"""Get alarm definitions from es."""
|
|
es_res = self._es_conn.get_messages(self.params)
|
|
es_res = self._get_alarm_definitions_response(es_res)
|
|
if es_res is None:
|
|
return None
|
|
LOG.debug('Query to ElasticSearch returned: %s' % es_res)
|
|
if es_res["hits"]:
|
|
return es_res["hits"]
|
|
else:
|
|
return []
|
|
|
|
def refresh_alarm_processors(self):
|
|
def create_alarm_processor():
|
|
# make sure received a new alarm definition
|
|
if aid in self.threshold_processors:
|
|
LOG.debug('already exsist alarm definition')
|
|
return
|
|
# init a processor for this alarm definition
|
|
temp_processor = (
|
|
driver.DriverManager(
|
|
namespace.PROCESSOR_NS,
|
|
cfg.CONF.thresholdengine.processor,
|
|
invoke_on_load=True,
|
|
invoke_args=(alarm_def,)).driver)
|
|
# register this new processor
|
|
self.threshold_processors[aid] = {}
|
|
self.threshold_processors[aid]['processor'] = (
|
|
temp_processor)
|
|
self.threshold_processors[aid]['flag'] = self.flag
|
|
self.threshold_processors[aid]['json'] = alarm_def
|
|
|
|
def update_alarm_processor():
|
|
# update the processor when alarm definition is changed
|
|
updated = False
|
|
if aid in self.threshold_processors:
|
|
updated = (self.threshold_processors
|
|
[aid]['processor']
|
|
.update_thresh_processor(alarm_def))
|
|
self.threshold_processors[aid]['json'] = alarm_def
|
|
if updated:
|
|
LOG.debug('alarm definition updates successfully!')
|
|
else:
|
|
LOG.debug('alarm definition update fail!')
|
|
|
|
def delete_alarm_processor():
|
|
# delete related processor when an alarm definition is deleted
|
|
if aid in self.threshold_processors:
|
|
self.threshold_processors.pop(aid)
|
|
|
|
self.flag = 1 - self.flag
|
|
# get all alarm definitions from es to update those in the engine
|
|
alarm_definitions = self.get_alarm_definitions()
|
|
# http request fails, do nothing
|
|
if alarm_definitions is None:
|
|
return
|
|
if lock.acquire():
|
|
for alarm_definition in alarm_definitions:
|
|
alarm_def = alarm_definition['_source']
|
|
aid = alarm_def['id']
|
|
if aid in self.threshold_processors:
|
|
# alarm definition is updated
|
|
if alarm_def != self.threshold_processors[aid]['json']:
|
|
update_alarm_processor()
|
|
self.threshold_processors[aid]['flag'] = self.flag
|
|
else:
|
|
# comes a new alarm definition
|
|
create_alarm_processor()
|
|
for aid in self.threshold_processors.keys():
|
|
if self.threshold_processors[aid]['flag'] != self.flag:
|
|
# the alarm definition is expired
|
|
delete_alarm_processor()
|
|
lock.release()
|
|
|
|
def run(self):
|
|
while True:
|
|
try:
|
|
self.refresh_alarm_processors()
|
|
time.sleep(self.interval)
|
|
except Exception:
|
|
LOG.exception('Error occurred '
|
|
'while reading alarm definitions.')
|
|
|
|
def stop(self):
|
|
pass
|
|
|
|
|
|
class ThresholdEngine(os_service.Service):
|
|
def __init__(self, threads=1000):
|
|
super(ThresholdEngine, self).__init__(threads)
|
|
# dict to index all the processors,
|
|
# key = alarm def id; value = processor
|
|
self.threshold_processors = {}
|
|
# init threads for processing metrics, alarm definition and alarm
|
|
try:
|
|
self.thread_alarm = AlarmPublisher(
|
|
'alarm_publisher',
|
|
self.threshold_processors)
|
|
except Exception:
|
|
self.thread_alarm = None
|
|
try:
|
|
self.thread_alarm_def = AlarmDefinitionConsumer(
|
|
'alarm_def_consumer',
|
|
self.threshold_processors)
|
|
except Exception:
|
|
self.thread_alarm_def = None
|
|
try:
|
|
self.thread_metrics = MetricsConsumer(
|
|
'metrics_consumer',
|
|
self.threshold_processors)
|
|
except Exception:
|
|
self.thread_metrics = None
|
|
|
|
def start(self):
|
|
try:
|
|
self.thread_alarm.start()
|
|
self.thread_alarm_def.start()
|
|
self.thread_metrics.start()
|
|
LOG.info('Starting thresh processing succeed!')
|
|
except Exception:
|
|
LOG.debug('Starting thresh processing failed!')
|
|
|
|
def stop(self):
|
|
try:
|
|
if self.thread_alarm:
|
|
self.thread_alarm.stop()
|
|
if self.thread_alarm_def:
|
|
self.thread_alarm_def.stop()
|
|
if self.thread_metrics:
|
|
self.thread_metrics.stop()
|
|
except Exception:
|
|
LOG.debug('Terminate thresh process threads error')
|
|
super(ThresholdEngine, self).stop()
|