vitrage/vitrage/persistency/service.py

211 lines
7.8 KiB
Python

# Copyright 2017 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import timedelta
from concurrent.futures import ThreadPoolExecutor
import dateutil.parser
from futurist import periodics
from oslo_config import cfg
from oslo_log import log
import oslo_messaging as oslo_m
from oslo_utils import timeutils
from vitrage.common.constants import EdgeProperties as EProps
from vitrage.common.constants import ElementProperties as ElementProps
from vitrage.common.constants import HistoryProps as HProps
from vitrage.common.constants import NotifierEventTypes as NETypes
from vitrage.common.constants import VertexProperties as VProps
from vitrage.common.utils import spawn
from vitrage.coordination import service as coord
from vitrage import messaging
from vitrage.storage.sqlalchemy import models
from vitrage.utils.datetime import utcnow
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class PersistorService(coord.Service):
def __init__(self, worker_id, db_connection):
super(PersistorService, self).__init__(worker_id)
self.db_connection = db_connection
transport = messaging.get_transport()
target = \
oslo_m.Target(topic=CONF.persistency.persistor_topic)
self.listener = messaging.get_notification_listener(
transport, [target],
[VitragePersistorEndpoint(self.db_connection)])
self.scheduler = Scheduler(db_connection)
def run(self):
super(PersistorService, self).run()
LOG.info("Vitrage Persistor Service - Starting...")
self.listener.start()
self.scheduler.start_periodic_tasks()
LOG.info("Vitrage Persistor Service - Started!")
def terminate(self):
super(PersistorService, self).terminate()
LOG.info("Vitrage Persistor Service - Stopping...")
self.listener.stop()
self.listener.wait()
LOG.info("Vitrage Persistor Service - Stopped!")
class VitragePersistorEndpoint(object):
def __init__(self, db_connection):
self.db = db_connection
self.event_type_to_writer = {
NETypes.ACTIVATE_ALARM_EVENT: self._persist_activated_alarm,
NETypes.DEACTIVATE_ALARM_EVENT: self._persist_deactivate_alarm,
NETypes.ACTIVATE_CAUSAL_RELATION: self._persist_activate_edge,
NETypes.DEACTIVATE_CAUSAL_RELATION: self._persist_deactivate_edge,
NETypes.CHANGE_IN_ALARM_EVENT: self._persist_change,
NETypes.CHANGE_PROJECT_ID_EVENT: self._persist_alarm_proj_change,
}
def info(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.debug('Event_type: %s Payload %s', event_type, payload)
self.process_event(event_type, payload)
def process_event(self, event_type, payload):
writer = self.event_type_to_writer.get(event_type)
if not writer:
LOG.warning('Unrecognized event_type: %s', event_type)
return
writer(event_type, payload)
def _persist_activated_alarm(self, event_type, data):
event_timestamp = self.event_time(data)
alarm_row = \
models.Alarm(
vitrage_id=data.get(VProps.VITRAGE_ID),
start_timestamp=event_timestamp,
name=data.get(VProps.NAME),
vitrage_type=data.get(VProps.VITRAGE_TYPE),
vitrage_aggregated_severity=data.get(
VProps.VITRAGE_AGGREGATED_SEVERITY),
vitrage_operational_severity=data.get(
VProps.VITRAGE_OPERATIONAL_SEVERITY),
project_id=data.get(VProps.PROJECT_ID),
vitrage_resource_type=data.get(VProps.VITRAGE_RESOURCE_TYPE),
vitrage_resource_id=data.get(VProps.VITRAGE_RESOURCE_ID),
vitrage_resource_project_id=data.get(
VProps.VITRAGE_RESOURCE_PROJECT_ID),
payload=data)
self.db.alarms.create(alarm_row)
def _persist_deactivate_alarm(self, event_type, data):
vitrage_id = data.get(VProps.VITRAGE_ID)
event_timestamp = self.event_time(data)
self.db.alarms.update(
vitrage_id, HProps.END_TIMESTAMP, event_timestamp)
def _persist_alarm_proj_change(self, event_type, data):
vitrage_id = data.get(VProps.VITRAGE_ID)
self.db.alarms.update(vitrage_id,
VProps.VITRAGE_RESOURCE_PROJECT_ID,
data.get(VProps.VITRAGE_RESOURCE_PROJECT_ID))
def _persist_activate_edge(self, event_type, data):
event_timestamp = self.event_time(data)
edge_row = \
models.Edge(
source_id=data.get(EProps.SOURCE_ID),
target_id=data.get(EProps.TARGET_ID),
label=data.get(EProps.RELATIONSHIP_TYPE),
start_timestamp=event_timestamp,
payload=data)
self.db.edges.create(edge_row)
def _persist_deactivate_edge(self, event_type, data):
event_timestamp = self.event_time(data)
source_id = data.get(EProps.SOURCE_ID)
target_id = data.get(EProps.TARGET_ID)
self.db.edges.update(
source_id, target_id, end_timestamp=event_timestamp)
def _persist_change(self, event_type, data):
event_timestamp = self.event_time(data)
change_row = \
models.Change(
vitrage_id=data.get(VProps.VITRAGE_ID),
timestamp=event_timestamp,
severity=data.get(VProps.VITRAGE_OPERATIONAL_SEVERITY),
payload=data)
self.db.changes.create(change_row)
@staticmethod
def event_time(data):
event_timestamp = \
dateutil.parser.parse(data.get(ElementProps.UPDATE_TIMESTAMP))
event_timestamp = timeutils.normalize_time(event_timestamp)
return event_timestamp
class Scheduler(object):
def __init__(self, db):
self.db = db
self.periodic = None
def start_periodic_tasks(self):
self.periodic = periodics.PeriodicWorker.create(
[], executor_factory=lambda: ThreadPoolExecutor(max_workers=10))
self.add_events_table_expirer_timer()
self.add_history_tables_expirer_timer()
spawn(self.periodic.start)
def add_events_table_expirer_timer(self):
spacing = 60
@periodics.periodic(spacing=spacing)
def expirer_periodic():
try:
event_id = self.db.graph_snapshots.query_snapshot_event_id()
if event_id:
LOG.debug('Table events - deleting event id=%s', event_id)
self.db.events.delete(event_id)
except Exception:
LOG.exception('Table events - periodic cleanup run failed.')
self.periodic.add(expirer_periodic)
LOG.info("Table events - periodic cleanup started (%ss)", spacing)
def add_history_tables_expirer_timer(self):
spacing = 60
@periodics.periodic(spacing=spacing)
def expirer_periodic():
expire_by = \
utcnow(with_timezone=False) - \
timedelta(days=CONF.persistency.alarm_history_ttl)
try:
self.db.alarms.delete_expired(expire_by)
except Exception:
LOG.exception('History tables - periodic cleanup run failed.')
self.periodic.add(expirer_periodic)
LOG.info("History tables - periodic cleanup started (%ss)", spacing)