Merge "alarm and RCA history support"
This commit is contained in:
commit
fe8e50721b
@ -11,6 +11,7 @@
|
||||
License for the specific language governing permissions and limitations
|
||||
under the License.
|
||||
|
||||
|
||||
Vitrage API
|
||||
-----------
|
||||
|
|
||||
@ -504,6 +505,16 @@ Query Parameters
|
||||
|
||||
vitrage_id - (string(255)) get alarm on this resource can be 'all' for all alarms.
|
||||
|
||||
Optional Parameters:
|
||||
|
||||
- limit - (int) maximum number of items to return, if limit=0 the method will return all matched items in alarms table.
|
||||
- sort_by - (array of string(255)) array of attributes by which results should be sorted.
|
||||
- sort_dirs - (array of string(255)) per-column array of sort_dirs,corresponding to sort_keys ('asc' or 'desc').
|
||||
- filter_by - (array of string(255)) array of attributes by which results will be filtered
|
||||
- filter_vals - (array of string(255)) per-column array of filter values corresponding to filter_by.
|
||||
- next_page - (bool) if True will return next page when marker is given, if False will return previous page when marker is given, otherwise, returns first page if no marker was given.
|
||||
- marker - ((string(255)) if None returns first page, else if vitrage_id is given and next_page is True, return next #limit results after marker, else, if next page is False, return #limit results before marker.
|
||||
|
||||
Request Body
|
||||
============
|
||||
|
||||
|
@ -18,14 +18,13 @@ import pecan
|
||||
|
||||
|
||||
from oslo_log import log
|
||||
from oslo_utils.strutils import bool_from_string
|
||||
from osprofiler import profiler
|
||||
from pecan.core import abort
|
||||
|
||||
from vitrage.api.controllers.rest import RootRestController
|
||||
from vitrage.api.controllers.v1.alarm_base import BaseAlarmsController
|
||||
from vitrage.api.controllers.v1 import count
|
||||
from vitrage.api.controllers.v1 import history
|
||||
from vitrage.api.policy import enforce
|
||||
from vitrage.common.constants import TenantProps
|
||||
from vitrage.common.constants import VertexProperties as Vprops
|
||||
|
||||
|
||||
@ -35,44 +34,19 @@ LOG = log.getLogger(__name__)
|
||||
# noinspection PyBroadException
|
||||
@profiler.trace_cls("alarm controller",
|
||||
info={}, hide_args=False, trace_private=False)
|
||||
class AlarmsController(RootRestController):
|
||||
class AlarmsController(BaseAlarmsController):
|
||||
count = count.CountsController()
|
||||
history = history.HistoryController()
|
||||
|
||||
@pecan.expose('json')
|
||||
def get_all(self, **kwargs):
|
||||
vitrage_id = kwargs.get(Vprops.VITRAGE_ID)
|
||||
all_tenants = kwargs.get(TenantProps.ALL_TENANTS, False)
|
||||
all_tenants = bool_from_string(all_tenants)
|
||||
if all_tenants:
|
||||
enforce("list alarms:all_tenants", pecan.request.headers,
|
||||
pecan.request.enforcer, {})
|
||||
else:
|
||||
enforce("list alarms", pecan.request.headers,
|
||||
pecan.request.enforcer, {})
|
||||
|
||||
LOG.info('returns list alarms with vitrage id %s', vitrage_id)
|
||||
kwargs['only_active_alarms'] = True
|
||||
|
||||
try:
|
||||
return self._get_alarms(vitrage_id, all_tenants)
|
||||
except Exception:
|
||||
LOG.exception('Failed to get alarms.')
|
||||
abort(404, 'Failed to get alarms.')
|
||||
LOG.info('returns alarms list with vitrage id %s',
|
||||
kwargs.get(Vprops.VITRAGE_ID))
|
||||
|
||||
@staticmethod
|
||||
def _get_alarms(vitrage_id=None, all_tenants=False):
|
||||
alarms_json = pecan.request.client.call(pecan.request.context,
|
||||
'get_alarms',
|
||||
vitrage_id=vitrage_id,
|
||||
all_tenants=all_tenants)
|
||||
LOG.info(alarms_json)
|
||||
|
||||
try:
|
||||
alarms_list = json.loads(alarms_json)['alarms']
|
||||
return alarms_list
|
||||
|
||||
except Exception:
|
||||
LOG.exception('Failed to open file.')
|
||||
abort(404, 'Failed to get alarms')
|
||||
return self._get_alarms(**kwargs)
|
||||
|
||||
@pecan.expose('json')
|
||||
def get(self, vitrage_id):
|
||||
|
79
vitrage/api/controllers/v1/alarm_base.py
Normal file
79
vitrage/api/controllers/v1/alarm_base.py
Normal file
@ -0,0 +1,79 @@
|
||||
# Copyright 2018 - Nokia Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import json
|
||||
from oslo_log import log
|
||||
from oslo_utils.strutils import bool_from_string
|
||||
import pecan
|
||||
from pecan.core import abort
|
||||
|
||||
from vitrage.api.controllers.rest import RootRestController
|
||||
from vitrage.api.policy import enforce
|
||||
from vitrage.common.constants import TenantProps
|
||||
from vitrage.common.constants import VertexProperties as Vprops
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
# noinspection PyBroadException
|
||||
class BaseAlarmsController(RootRestController):
|
||||
|
||||
@staticmethod
|
||||
def _get_alarms(**kwargs):
|
||||
vitrage_id = kwargs.get(Vprops.VITRAGE_ID)
|
||||
start = kwargs.get('start')
|
||||
end = kwargs.get('end')
|
||||
limit = kwargs.get('limit', 10000)
|
||||
sort_by = kwargs.get('sort_by', ['start_timestamp', 'vitrage_id'])
|
||||
sort_dirs = kwargs.get('sort_dirs', ['asc', 'asc'])
|
||||
filter_by = kwargs.get('filter_by', [])
|
||||
filter_vals = kwargs.get('filter_vals', [])
|
||||
next_page = kwargs.get('next_page', True)
|
||||
marker = kwargs.get('marker')
|
||||
only_active_alarms = kwargs.get('only_active_alarms', False)
|
||||
all_tenants = kwargs.get(TenantProps.ALL_TENANTS, False)
|
||||
all_tenants = bool_from_string(all_tenants)
|
||||
if all_tenants:
|
||||
enforce("list alarms:all_tenants", pecan.request.headers,
|
||||
pecan.request.enforcer, {})
|
||||
else:
|
||||
enforce("list alarms", pecan.request.headers,
|
||||
pecan.request.enforcer, {})
|
||||
|
||||
alarms_json = \
|
||||
pecan.request.client.call(pecan.request.context,
|
||||
'get_alarms',
|
||||
vitrage_id=vitrage_id,
|
||||
all_tenants=all_tenants,
|
||||
start=start,
|
||||
end=end,
|
||||
limit=limit,
|
||||
sort_by=sort_by,
|
||||
sort_dirs=sort_dirs,
|
||||
filter_by=filter_by,
|
||||
filter_vals=filter_vals,
|
||||
next_page=next_page,
|
||||
marker=marker,
|
||||
only_active_alarms=only_active_alarms
|
||||
)
|
||||
|
||||
try:
|
||||
alarms_list = json.loads(alarms_json)['alarms']
|
||||
return alarms_list
|
||||
|
||||
except Exception:
|
||||
LOG.exception('Failed to get alarms')
|
||||
abort(404, 'Failed to get alarms')
|
32
vitrage/api/controllers/v1/history.py
Normal file
32
vitrage/api/controllers/v1/history.py
Normal file
@ -0,0 +1,32 @@
|
||||
# Copyright 2018 - Nokia Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log
|
||||
import pecan
|
||||
|
||||
from vitrage.api.controllers.v1.alarm_base import BaseAlarmsController
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
# noinspection PyBroadException
|
||||
class HistoryController(BaseAlarmsController):
|
||||
|
||||
@pecan.expose('json')
|
||||
def get_all(self, **kwargs):
|
||||
|
||||
LOG.info('returns history alarms')
|
||||
|
||||
return self._get_alarms(**kwargs)
|
@ -12,19 +12,20 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from dateutil import parser
|
||||
import json
|
||||
from oslo_log import log
|
||||
from osprofiler import profiler
|
||||
|
||||
from vitrage.api_handler.apis.base import ALARM_QUERY
|
||||
from vitrage.api_handler.apis.base import ALARMS_ALL_QUERY
|
||||
from vitrage.api_handler.apis.base import EntityGraphApisBase
|
||||
from vitrage.common.constants import EntityCategory as ECategory
|
||||
from vitrage.common.constants import HistoryProps as HProps
|
||||
from vitrage.common.constants import TenantProps
|
||||
from vitrage.common.constants import VertexProperties as VProps
|
||||
from vitrage.entity_graph.mappings.operational_alarm_severity import \
|
||||
OperationalAlarmSeverity
|
||||
|
||||
from vitrage.storage import db_time
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
@ -33,34 +34,29 @@ LOG = log.getLogger(__name__)
|
||||
info={}, hide_args=False, trace_private=False)
|
||||
class AlarmApis(EntityGraphApisBase):
|
||||
|
||||
def __init__(self, entity_graph, conf):
|
||||
def __init__(self, entity_graph, conf, db):
|
||||
self.entity_graph = entity_graph
|
||||
self.conf = conf
|
||||
self.db = db
|
||||
|
||||
def get_alarms(self, ctx, vitrage_id, all_tenants):
|
||||
LOG.debug("AlarmApis get_alarms - vitrage_id: %s, all_tenants=%s",
|
||||
str(vitrage_id), all_tenants)
|
||||
def get_alarms(self, ctx, vitrage_id, all_tenants, *args, **kwargs):
|
||||
|
||||
project_id = ctx.get(TenantProps.TENANT, None)
|
||||
is_admin_project = ctx.get(TenantProps.IS_ADMIN, False)
|
||||
kwargs = self._parse_kwargs(kwargs)
|
||||
|
||||
if not vitrage_id or vitrage_id == 'all':
|
||||
if all_tenants:
|
||||
alarms = self.entity_graph.get_vertices(
|
||||
query_dict=ALARMS_ALL_QUERY)
|
||||
else:
|
||||
alarms = self._get_alarms(project_id, is_admin_project)
|
||||
alarms += self._get_alarms_via_resource(project_id,
|
||||
is_admin_project)
|
||||
alarms = set(alarms)
|
||||
if not all_tenants:
|
||||
kwargs['project_id'] = \
|
||||
ctx.get(TenantProps.TENANT, 'no-project')
|
||||
kwargs['is_admin_project'] = \
|
||||
ctx.get(TenantProps.IS_ADMIN, False)
|
||||
else:
|
||||
query = {VProps.VITRAGE_CATEGORY: ECategory.ALARM,
|
||||
VProps.VITRAGE_IS_DELETED: False}
|
||||
alarms = self.entity_graph.neighbors(vitrage_id,
|
||||
vertex_attr_filter=query)
|
||||
kwargs.get('filter_by', []).append(VProps.VITRAGE_RESOURCE_ID)
|
||||
kwargs.get('filter_vals', []).append(vitrage_id)
|
||||
|
||||
return json.dumps({'alarms': [v.properties for v in alarms]})
|
||||
alarms = self._get_alarms(*args, **kwargs)
|
||||
return json.dumps({'alarms': [v.payload for v in alarms]})
|
||||
|
||||
# TODO(annarez): add db support
|
||||
def show_alarm(self, ctx, vitrage_id):
|
||||
LOG.debug('Show alarm with vitrage_id: %s', vitrage_id)
|
||||
|
||||
@ -85,70 +81,58 @@ class AlarmApis(EntityGraphApisBase):
|
||||
is_admin_project = ctx.get(TenantProps.IS_ADMIN, False)
|
||||
|
||||
if all_tenants:
|
||||
alarms = self.entity_graph.get_vertices(
|
||||
query_dict=ALARMS_ALL_QUERY)
|
||||
counts = self.db.history_facade.count_active_alarms()
|
||||
|
||||
else:
|
||||
alarms = self._get_alarms(project_id, is_admin_project)
|
||||
alarms += self._get_alarms_via_resource(project_id,
|
||||
is_admin_project)
|
||||
alarms = set(alarms)
|
||||
|
||||
counts = {OperationalAlarmSeverity.SEVERE: 0,
|
||||
OperationalAlarmSeverity.CRITICAL: 0,
|
||||
OperationalAlarmSeverity.WARNING: 0,
|
||||
OperationalAlarmSeverity.OK: 0,
|
||||
OperationalAlarmSeverity.NA: 0}
|
||||
|
||||
for alarm in alarms:
|
||||
severity = alarm.get(VProps.VITRAGE_OPERATIONAL_SEVERITY)
|
||||
if severity:
|
||||
try:
|
||||
counts[severity] += 1
|
||||
except KeyError:
|
||||
pass
|
||||
counts = self.db.history_facade.count_active_alarms(
|
||||
project_id=project_id,
|
||||
is_admin_project=is_admin_project)
|
||||
|
||||
return json.dumps(counts)
|
||||
|
||||
def _get_alarms(self, project_id, is_admin_project):
|
||||
def _get_alarms(self, *args, **kwargs):
|
||||
"""Finds all the alarms with project_id
|
||||
|
||||
Finds all the alarms which has the project_id. In case the tenant is
|
||||
admin then project_id can also be None.
|
||||
|
||||
:type project_id: string
|
||||
:type is_admin_project: boolean
|
||||
:rtype: list
|
||||
"""
|
||||
|
||||
alarm_query = self._get_query_with_project(ECategory.ALARM,
|
||||
project_id,
|
||||
is_admin_project)
|
||||
alarms = self.entity_graph.get_vertices(query_dict=alarm_query)
|
||||
return self._filter_alarms(alarms, project_id)
|
||||
|
||||
def _get_alarms_via_resource(self, project_id, is_admin_project):
|
||||
"""Finds all the alarms with project_id on their resource
|
||||
|
||||
Finds all the resource which has project_id and return all the alarms
|
||||
on those resources project_id. In case the tenant is admin then
|
||||
project_id can also be None.
|
||||
|
||||
:type project_id: string
|
||||
:type is_admin_project: boolean
|
||||
:rtype: list
|
||||
"""
|
||||
|
||||
resource_query = self._get_query_with_project(ECategory.RESOURCE,
|
||||
project_id,
|
||||
is_admin_project)
|
||||
|
||||
alarms = []
|
||||
resources = self.entity_graph.get_vertices(query_dict=resource_query)
|
||||
|
||||
for resource in resources:
|
||||
new_alarms = \
|
||||
self.entity_graph.neighbors(
|
||||
resource.vertex_id, vertex_attr_filter=ALARM_QUERY)
|
||||
alarms = alarms + new_alarms
|
||||
alarms = self.db.history_facade.get_alarms(*args, **kwargs)
|
||||
if not kwargs.get('only_active_alarms'):
|
||||
for alarm in alarms:
|
||||
# change operational severity of ended alarms to 'OK'
|
||||
# TODO(annarez): in next version use 'state'
|
||||
if alarm.end_timestamp <= db_time():
|
||||
alarm.payload[VProps.VITRAGE_OPERATIONAL_SEVERITY] = \
|
||||
OperationalAlarmSeverity.OK
|
||||
for alarm in alarms:
|
||||
alarm.payload[HProps.START_TIMESTAMP] = str(alarm.start_timestamp)
|
||||
if alarm.end_timestamp <= db_time():
|
||||
alarm.payload[HProps.END_TIMESTAMP] = str(alarm.end_timestamp)
|
||||
|
||||
return alarms
|
||||
|
||||
def _parse_kwargs(self, kwargs):
|
||||
kwargs = {k: v for k, v in kwargs.items() if v is not None}
|
||||
|
||||
if kwargs.get('start'):
|
||||
kwargs['start'] = parser.parse(kwargs['start'])
|
||||
if kwargs.get('end'):
|
||||
kwargs['end'] = parser.parse(kwargs['end'])
|
||||
if kwargs.get('sort_by') and type(kwargs.get('sort_by')) != list:
|
||||
kwargs['sort_by'] = [kwargs.get('sort_by')]
|
||||
if kwargs.get('sort_dirs') and type(kwargs.get('sort_dirs')) != list:
|
||||
kwargs['sort_dirs'] = [kwargs.get('sort_dirs')]
|
||||
if str(kwargs.get('next_page')).lower() == 'false':
|
||||
kwargs['next_page'] = False
|
||||
else:
|
||||
kwargs['next_page'] = True
|
||||
|
||||
if kwargs.get('filter_by') and type(kwargs.get('filter_by')) != list:
|
||||
kwargs['filter_by'] = [kwargs.get('filter_by')]
|
||||
if kwargs.get('filter_vals') and type(
|
||||
kwargs.get('filter_vals')) != list:
|
||||
kwargs['filter_vals'] = [kwargs.get('filter_vals')]
|
||||
|
||||
return kwargs
|
||||
|
@ -21,8 +21,6 @@ from vitrage.datasources.nova.host import NOVA_HOST_DATASOURCE
|
||||
from vitrage.datasources.nova.instance import NOVA_INSTANCE_DATASOURCE
|
||||
from vitrage.datasources.nova.zone import NOVA_ZONE_DATASOURCE
|
||||
from vitrage.datasources import OPENSTACK_CLUSTER
|
||||
from vitrage.graph import Direction
|
||||
from vitrage.keystone_client import get_client as ks_client
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
@ -57,12 +55,6 @@ TOPOLOGY_AND_ALARMS_QUERY = {
|
||||
]
|
||||
}
|
||||
|
||||
RCA_QUERY = {
|
||||
'and': [
|
||||
{'==': {VProps.VITRAGE_CATEGORY: EntityCategory.ALARM}},
|
||||
{'==': {VProps.VITRAGE_IS_DELETED: False}}
|
||||
]
|
||||
}
|
||||
|
||||
ALARMS_ALL_QUERY = {
|
||||
'and': [
|
||||
@ -71,12 +63,6 @@ ALARMS_ALL_QUERY = {
|
||||
]
|
||||
}
|
||||
|
||||
ALARM_QUERY = {
|
||||
VProps.VITRAGE_CATEGORY: EntityCategory.ALARM,
|
||||
VProps.VITRAGE_IS_DELETED: False,
|
||||
VProps.VITRAGE_IS_PLACEHOLDER: False
|
||||
}
|
||||
|
||||
EDGE_QUERY = {'==': {EProps.VITRAGE_IS_DELETED: False}}
|
||||
|
||||
RESOURCES_ALL_QUERY = {
|
||||
@ -142,81 +128,3 @@ class EntityGraphApisBase(object):
|
||||
query_with_project_id = {'and': [project_query, query]}
|
||||
|
||||
return query_with_project_id
|
||||
|
||||
def _filter_alarms(self, alarms, project_id):
|
||||
"""Remove wrong alarms from the list
|
||||
|
||||
Removes alarms where the project_id of the resource they sit on is
|
||||
different than the project_id sent as a parameter
|
||||
|
||||
:type alarms: list
|
||||
:type project_id: string
|
||||
:rtype: list
|
||||
"""
|
||||
|
||||
alarms_to_remove = []
|
||||
|
||||
for alarm in alarms:
|
||||
alarm_project_id = alarm.get(VProps.PROJECT_ID, None)
|
||||
if not alarm_project_id:
|
||||
cat_filter = {VProps.VITRAGE_CATEGORY: EntityCategory.RESOURCE}
|
||||
alarms_resource = \
|
||||
self.entity_graph.neighbors(alarm.vertex_id,
|
||||
vertex_attr_filter=cat_filter)
|
||||
if len(alarms_resource) > 0:
|
||||
resource_project_id = \
|
||||
alarms_resource[0].get(VProps.PROJECT_ID, None)
|
||||
if resource_project_id and \
|
||||
resource_project_id != project_id:
|
||||
alarms_to_remove.append(alarm)
|
||||
elif alarm_project_id != project_id:
|
||||
alarms_to_remove.append(alarm)
|
||||
|
||||
return [x for x in alarms if x not in alarms_to_remove]
|
||||
|
||||
def _is_alarm_of_current_project(self,
|
||||
entity,
|
||||
project_id,
|
||||
is_admin_project):
|
||||
"""Checks if the alarm is of the current tenant
|
||||
|
||||
Checks:
|
||||
1. checks if the project_id is the same
|
||||
2. if the tenant is admin then the projectid can be also None
|
||||
3. check the project_id of the resource where the alarm sits is the
|
||||
same as the project_id sent as a parameter
|
||||
|
||||
:type entity: vertex
|
||||
:type project_id: string
|
||||
:type is_admin_project: boolean
|
||||
:rtype: boolean
|
||||
"""
|
||||
|
||||
current_project_id = entity.get(VProps.PROJECT_ID, None)
|
||||
if current_project_id == project_id:
|
||||
return True
|
||||
elif not current_project_id and is_admin_project:
|
||||
return True
|
||||
else:
|
||||
entities = self.entity_graph.neighbors(entity.vertex_id,
|
||||
direction=Direction.OUT)
|
||||
for entity in entities:
|
||||
if entity[VProps.VITRAGE_CATEGORY] == EntityCategory.RESOURCE:
|
||||
resource_project_id = entity.get(VProps.PROJECT_ID)
|
||||
if resource_project_id == project_id or \
|
||||
(not resource_project_id and is_admin_project):
|
||||
return True
|
||||
return False
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _get_first(lst):
|
||||
if len(lst) == 1:
|
||||
return lst[0]
|
||||
else:
|
||||
return None
|
||||
|
||||
def _is_project_admin(self, project_id):
|
||||
keystone_client = ks_client(self.conf)
|
||||
project = keystone_client.projects.get(project_id)
|
||||
return 'name=admin' in project.to_dict()
|
||||
|
@ -15,14 +15,13 @@
|
||||
from oslo_log import log
|
||||
from osprofiler import profiler
|
||||
|
||||
|
||||
from vitrage.api_handler.apis.base import ALARMS_ALL_QUERY
|
||||
from vitrage.api_handler.apis.base import EDGE_QUERY
|
||||
from vitrage.api_handler.apis.base import EntityGraphApisBase
|
||||
from vitrage.api_handler.apis.base import RCA_QUERY
|
||||
from vitrage.common.constants import HistoryProps as HProps
|
||||
from vitrage.common.constants import TenantProps
|
||||
from vitrage.graph import Direction
|
||||
|
||||
from vitrage.graph.driver.networkx_graph import NXGraph
|
||||
from vitrage.graph import Edge
|
||||
from vitrage.graph import Vertex
|
||||
from vitrage.storage import db_time
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
@ -31,9 +30,10 @@ LOG = log.getLogger(__name__)
|
||||
info={}, hide_args=False, trace_private=False)
|
||||
class RcaApis(EntityGraphApisBase):
|
||||
|
||||
def __init__(self, entity_graph, conf):
|
||||
def __init__(self, entity_graph, conf, db):
|
||||
self.entity_graph = entity_graph
|
||||
self.conf = conf
|
||||
self.db = db
|
||||
|
||||
def get_rca(self, ctx, root, all_tenants):
|
||||
LOG.debug("RcaApis get_rca - root: %s, all_tenants=%s",
|
||||
@ -41,110 +41,31 @@ class RcaApis(EntityGraphApisBase):
|
||||
|
||||
project_id = ctx.get(TenantProps.TENANT, None)
|
||||
is_admin_project = ctx.get(TenantProps.IS_ADMIN, False)
|
||||
ga = self.entity_graph.algo
|
||||
|
||||
found_graph_out = ga.graph_query_vertices(root,
|
||||
query_dict=RCA_QUERY,
|
||||
direction=Direction.OUT,
|
||||
edge_query_dict=EDGE_QUERY)
|
||||
found_graph_in = ga.graph_query_vertices(root,
|
||||
query_dict=RCA_QUERY,
|
||||
direction=Direction.IN,
|
||||
edge_query_dict=EDGE_QUERY)
|
||||
|
||||
if all_tenants:
|
||||
unified_graph = found_graph_in
|
||||
unified_graph.union(found_graph_out)
|
||||
db_nodes, db_edges = self.db.history_facade.alarm_rca(root)
|
||||
else:
|
||||
unified_graph = \
|
||||
self._get_rca_for_specific_project(ga,
|
||||
found_graph_in,
|
||||
found_graph_out,
|
||||
root,
|
||||
project_id,
|
||||
is_admin_project)
|
||||
db_nodes, db_edges = self.db.history_facade.alarm_rca(
|
||||
root,
|
||||
project_id=project_id,
|
||||
admin=is_admin_project)
|
||||
|
||||
alarms = unified_graph.get_vertices(query_dict=ALARMS_ALL_QUERY)
|
||||
unified_graph.update_vertices(alarms)
|
||||
for n in db_nodes:
|
||||
n.payload[HProps.START_TIMESTAMP] = str(n.start_timestamp)
|
||||
if n.end_timestamp <= db_time():
|
||||
n.payload[HProps.END_TIMESTAMP] = str(n.end_timestamp)
|
||||
|
||||
json_graph = unified_graph.json_output_graph(
|
||||
inspected_index=self._find_rca_index(unified_graph, root))
|
||||
vertices = [Vertex(vertex_id=n.vitrage_id, properties=n.payload) for n
|
||||
in db_nodes]
|
||||
edges = [Edge(source_id=e.source_id, target_id=e.target_id,
|
||||
label=e.label, properties=e.payload) for e in db_edges]
|
||||
rca_graph = NXGraph(vertices=vertices, edges=edges)
|
||||
|
||||
json_graph = rca_graph.json_output_graph(
|
||||
inspected_index=self._find_rca_index(rca_graph, root))
|
||||
|
||||
return json_graph
|
||||
|
||||
def _get_rca_for_specific_project(self,
|
||||
ga,
|
||||
found_graph_in,
|
||||
found_graph_out,
|
||||
root,
|
||||
project_id,
|
||||
is_admin_project):
|
||||
"""Filter the RCA for root entity with consideration of project_id
|
||||
|
||||
Filter the RCA for root by:
|
||||
1. filter the alarms deduced from the root alarm (found_graph_in)
|
||||
2. filter the alarms caused the root alarm (found_graph_out)
|
||||
And in the end unify 1 and 2
|
||||
|
||||
:type ga: NXAlgorithm
|
||||
:type found_graph_in: NXGraph
|
||||
:type found_graph_out: NXGraph
|
||||
:type root: string
|
||||
:type project_id: string
|
||||
:type is_admin_project: boolean
|
||||
:rtype: NXGraph
|
||||
"""
|
||||
|
||||
filtered_alarms_out = \
|
||||
self._filter_alarms(found_graph_out.get_vertices(), project_id)
|
||||
filtered_found_graph_out = ga.subgraph(
|
||||
[node.vertex_id for node in filtered_alarms_out])
|
||||
filtered_found_graph_in = \
|
||||
self._filter_rca_causing_entities(ga,
|
||||
found_graph_in,
|
||||
root,
|
||||
project_id,
|
||||
is_admin_project)
|
||||
filtered_found_graph_out.union(filtered_found_graph_in)
|
||||
|
||||
return filtered_found_graph_out
|
||||
|
||||
def _filter_rca_causing_entities(self,
|
||||
ga,
|
||||
rca_graph,
|
||||
root_id,
|
||||
project_id,
|
||||
is_admin_project):
|
||||
"""Filter the RCA entities which caused this alarm
|
||||
|
||||
Shows only the causing alarms which has the same project_id and also
|
||||
the first alarm that has a different project_id. In case the tenant is
|
||||
admin then project_id can also be None.
|
||||
|
||||
:type ga: NXAlgorithm
|
||||
:type rca_graph: NXGraph
|
||||
:type root_id: string
|
||||
:type project_id: string
|
||||
:type is_admin_project: boolean
|
||||
:rtype: NXGraph
|
||||
"""
|
||||
|
||||
entities = [root_id]
|
||||
current_entity_id = root_id
|
||||
|
||||
while len(rca_graph.neighbors(current_entity_id,
|
||||
direction=Direction.IN)) > 0:
|
||||
current_entity = rca_graph.neighbors(current_entity_id,
|
||||
direction=Direction.IN)[0]
|
||||
current_entity_id = current_entity.vertex_id
|
||||
entities.append(current_entity.vertex_id)
|
||||
if not self._is_alarm_of_current_project(current_entity,
|
||||
project_id,
|
||||
is_admin_project):
|
||||
break
|
||||
|
||||
return ga.subgraph(entities)
|
||||
|
||||
@staticmethod
|
||||
def _find_rca_index(found_graph, root):
|
||||
for root_index, vertex in enumerate(found_graph._g):
|
||||
|
@ -30,3 +30,6 @@ def purge_data():
|
||||
db.active_actions.delete()
|
||||
db.events.delete()
|
||||
db.graph_snapshots.delete()
|
||||
db.changes.delete()
|
||||
db.edges.delete()
|
||||
db.alarms.delete()
|
||||
|
@ -31,6 +31,7 @@ class VertexProperties(ElementProperties):
|
||||
VITRAGE_AGGREGATED_SEVERITY = 'vitrage_aggregated_severity'
|
||||
VITRAGE_OPERATIONAL_SEVERITY = 'vitrage_operational_severity'
|
||||
VITRAGE_RESOURCE_ID = 'vitrage_resource_id'
|
||||
VITRAGE_RESOURCE_PROJECT_ID = 'vitrage_resource_project_id'
|
||||
VITRAGE_CACHED_ID = 'vitrage_cached_id'
|
||||
ID = 'id'
|
||||
STATE = 'state'
|
||||
@ -49,6 +50,8 @@ class VertexProperties(ElementProperties):
|
||||
|
||||
|
||||
class EdgeProperties(ElementProperties):
|
||||
SOURCE_ID = 'source_id'
|
||||
TARGET_ID = 'target_id'
|
||||
RELATIONSHIP_TYPE = 'relationship_type'
|
||||
|
||||
|
||||
@ -118,9 +121,13 @@ class NotifierEventTypes(object):
|
||||
DEACTIVATE_DEDUCED_ALARM_EVENT = 'vitrage.deduced_alarm.deactivate'
|
||||
ACTIVATE_ALARM_EVENT = 'vitrage.alarm.activate'
|
||||
DEACTIVATE_ALARM_EVENT = 'vitrage.alarm.deactivate'
|
||||
CHANGE_IN_ALARM_EVENT = 'vitrage.alarm.change'
|
||||
CHANGE_PROJECT_ID_EVENT = 'vitrage.alarm.change_project_id'
|
||||
ACTIVATE_MARK_DOWN_EVENT = 'vitrage.mark_down.activate'
|
||||
DEACTIVATE_MARK_DOWN_EVENT = 'vitrage.mark_down.deactivate'
|
||||
EXECUTE_EXTERNAL_ACTION = 'vitrage.execute_external_action'
|
||||
ACTIVATE_CAUSAL_RELATION = 'vitrage.causal_relationship.activate'
|
||||
DEACTIVATE_CAUSAL_RELATION = 'vitrage.causal_relationship.deactivate'
|
||||
|
||||
|
||||
class TemplateTopologyFields(object):
|
||||
@ -182,3 +189,11 @@ class TenantProps(object):
|
||||
ALL_TENANTS = 'all_tenants'
|
||||
TENANT = 'tenant'
|
||||
IS_ADMIN = 'is_admin'
|
||||
|
||||
|
||||
class HistoryProps(object):
|
||||
VITRAGE_ID = 'vitrage_id'
|
||||
SOURCE_ID = 'source_id'
|
||||
TARGET_ID = 'target_id'
|
||||
START_TIMESTAMP = 'start_timestamp'
|
||||
END_TIMESTAMP = 'end_timestamp'
|
||||
|
@ -49,6 +49,29 @@ rules = [
|
||||
}
|
||||
]
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
name='list alarms history',
|
||||
check_str=base.UNPROTECTED,
|
||||
description='List the alarms history',
|
||||
operations=[
|
||||
{
|
||||
'path': '/alarm/history',
|
||||
'method': 'GET'
|
||||
}
|
||||
]
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
name='list alarms history:all_tenants',
|
||||
check_str=base.ROLE_ADMIN,
|
||||
description='List alarms history of all tenants '
|
||||
'(if the user has the permissions)',
|
||||
operations=[
|
||||
{
|
||||
'path': '/alarm/history',
|
||||
'method': 'GET'
|
||||
}
|
||||
]
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
name='get alarms count',
|
||||
check_str=base.UNPROTECTED,
|
||||
|
@ -25,6 +25,7 @@ from vitrage.entity_graph import datasource_rpc as ds_rpc
|
||||
from vitrage.entity_graph import EVALUATOR_TOPIC
|
||||
from vitrage.entity_graph.graph_persistency import GraphPersistency
|
||||
from vitrage.entity_graph.processor.notifier import GraphNotifier
|
||||
from vitrage.entity_graph.processor.notifier import PersistNotifier
|
||||
from vitrage.entity_graph.processor.processor import Processor
|
||||
from vitrage.entity_graph.scheduler import Scheduler
|
||||
from vitrage.entity_graph.workers import GraphWorkersManager
|
||||
@ -67,12 +68,16 @@ class VitrageGraphInit(object):
|
||||
self.persist.replay_events(self.graph, graph_snapshot.event_id)
|
||||
self._recreate_transformers_id_cache()
|
||||
LOG.info("%s vertices loaded", self.graph.num_vertices())
|
||||
self.subscribe_presist_notifier()
|
||||
spawn(self._start_all_workers, is_snapshot=True)
|
||||
|
||||
def _start_from_scratch(self):
|
||||
LOG.info('Starting for the first time')
|
||||
LOG.info('Clearing database active_actions')
|
||||
self.db.active_actions.delete()
|
||||
LOG.info('Disabling previously active alarms')
|
||||
self.db.history_facade.disable_alarms_in_history()
|
||||
self.subscribe_presist_notifier()
|
||||
ds_rpc.get_all(
|
||||
ds_rpc.create_rpc_client_instance(self.conf),
|
||||
self.events_coordination,
|
||||
@ -118,6 +123,8 @@ class VitrageGraphInit(object):
|
||||
self.graph.subscribe(self.persist.persist_event,
|
||||
finalization=True)
|
||||
|
||||
def subscribe_presist_notifier(self):
|
||||
self.graph.subscribe(PersistNotifier(self.conf).notify_when_applicable)
|
||||
|
||||
PRIORITY_DELAY = 0.05
|
||||
|
||||
|
@ -14,14 +14,16 @@
|
||||
from oslo_log import log
|
||||
import oslo_messaging
|
||||
|
||||
from vitrage.common.constants import EdgeLabel as ELabel
|
||||
from vitrage.common.constants import EdgeProperties as EProps
|
||||
from vitrage.common.constants import EntityCategory
|
||||
from vitrage.common.constants import NotifierEventTypes
|
||||
from vitrage.common.constants import VertexProperties as VProps
|
||||
from vitrage.evaluator.actions import evaluator_event_transformer as evaluator
|
||||
from vitrage.graph.driver.networkx_graph import edge_copy
|
||||
from vitrage.graph.driver.networkx_graph import vertex_copy
|
||||
from vitrage.messaging import get_transport
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
@ -66,18 +68,9 @@ class GraphNotifier(object):
|
||||
return topics
|
||||
|
||||
def notify_when_applicable(self, before, current, is_vertex, graph):
|
||||
"""Callback subscribed to driver.graph updates
|
||||
|
||||
:param is_vertex:
|
||||
:param before: The graph element (vertex or edge) prior to the
|
||||
change that happened. None if the element was just created.
|
||||
:param current: The graph element (vertex or edge) after the
|
||||
change that happened. Deleted elements should arrive with the
|
||||
vitrage_is_deleted property set to True
|
||||
:param graph: The graph
|
||||
"""
|
||||
curr = current
|
||||
notification_types = _get_notification_type(before, curr, is_vertex)
|
||||
notification_types = \
|
||||
self._get_notification_type(before, curr, is_vertex)
|
||||
if not notification_types:
|
||||
return
|
||||
|
||||
@ -88,8 +81,8 @@ class GraphNotifier(object):
|
||||
curr.properties[VProps.RESOURCE] = graph.get_vertex(
|
||||
curr.get(VProps.VITRAGE_RESOURCE_ID))
|
||||
|
||||
LOG.info('notification_types : %s', str(notification_types))
|
||||
LOG.info('notification properties : %s', curr.properties)
|
||||
LOG.debug('notification_types : %s', str(notification_types))
|
||||
LOG.debug('notification properties : %s', curr.properties)
|
||||
|
||||
for notification_type in notification_types:
|
||||
try:
|
||||
@ -100,61 +93,156 @@ class GraphNotifier(object):
|
||||
except Exception:
|
||||
LOG.exception('Cannot notify - %s.', notification_type)
|
||||
|
||||
@staticmethod
|
||||
def _get_notification_type(before, current, is_vertex):
|
||||
if not is_vertex:
|
||||
return None
|
||||
|
||||
def _get_notification_type(before, current, is_vertex):
|
||||
if not is_vertex:
|
||||
return None
|
||||
|
||||
def notification_type(is_active,
|
||||
activate_event_type,
|
||||
deactivate_event_type):
|
||||
if not is_active(before):
|
||||
if is_active(current):
|
||||
return activate_event_type
|
||||
else:
|
||||
if not is_active(current):
|
||||
return deactivate_event_type
|
||||
|
||||
notification_types = [
|
||||
notification_type(_is_active_deduced_alarm,
|
||||
NotifierEventTypes.ACTIVATE_DEDUCED_ALARM_EVENT,
|
||||
NotifierEventTypes.DEACTIVATE_DEDUCED_ALARM_EVENT),
|
||||
notification_type(_is_active_alarm,
|
||||
NotifierEventTypes.ACTIVATE_ALARM_EVENT,
|
||||
NotifierEventTypes.DEACTIVATE_ALARM_EVENT),
|
||||
notification_type(_is_marked_down,
|
||||
NotifierEventTypes.ACTIVATE_MARK_DOWN_EVENT,
|
||||
NotifierEventTypes.DEACTIVATE_MARK_DOWN_EVENT),
|
||||
]
|
||||
return list(filter(None, notification_types))
|
||||
notification_types = [
|
||||
notification_type(
|
||||
before, current, _is_active_deduced_alarm,
|
||||
NotifierEventTypes.ACTIVATE_DEDUCED_ALARM_EVENT,
|
||||
NotifierEventTypes.DEACTIVATE_DEDUCED_ALARM_EVENT),
|
||||
notification_type(
|
||||
before, current, _is_active_alarm,
|
||||
NotifierEventTypes.ACTIVATE_ALARM_EVENT,
|
||||
NotifierEventTypes.DEACTIVATE_ALARM_EVENT),
|
||||
notification_type(
|
||||
before, current, _is_marked_down,
|
||||
NotifierEventTypes.ACTIVATE_MARK_DOWN_EVENT,
|
||||
NotifierEventTypes.DEACTIVATE_MARK_DOWN_EVENT),
|
||||
]
|
||||
return list(filter(None, notification_types))
|
||||
|
||||
|
||||
def _is_active_deduced_alarm(vertex):
|
||||
if not vertex:
|
||||
class PersistNotifier(object):
|
||||
"""Allows writing to message bus"""
|
||||
def __init__(self, conf):
|
||||
self.oslo_notifier = None
|
||||
topics = [conf.persistency.persistor_topic]
|
||||
self.oslo_notifier = oslo_messaging.Notifier(
|
||||
get_transport(conf),
|
||||
driver='messagingv2',
|
||||
publisher_id='vitrage.graph',
|
||||
topics=topics)
|
||||
|
||||
def notify_when_applicable(self, before, current, is_vertex, graph):
|
||||
|
||||
curr = current
|
||||
notification_types = \
|
||||
self._get_notification_type(before, curr, is_vertex)
|
||||
if not notification_types:
|
||||
return
|
||||
|
||||
# in case the event is on edge, add source and target ids to properties
|
||||
# for history
|
||||
if not is_vertex:
|
||||
curr = edge_copy(
|
||||
curr.source_id, curr.target_id, curr.label, curr.properties)
|
||||
curr.properties[EProps.SOURCE_ID] = curr.source_id
|
||||
curr.properties[EProps.TARGET_ID] = curr.target_id
|
||||
|
||||
LOG.debug('persist_notification_types : %s', str(notification_types))
|
||||
LOG.debug('persist_notification properties : %s', curr.properties)
|
||||
|
||||
for notification_type in notification_types:
|
||||
try:
|
||||
self.oslo_notifier.info(
|
||||
{},
|
||||
notification_type,
|
||||
curr.properties)
|
||||
except Exception:
|
||||
LOG.exception('Cannot notify - %s.', notification_type)
|
||||
|
||||
@staticmethod
|
||||
def _get_notification_type(before, current, is_vertex):
|
||||
|
||||
notification_types = [
|
||||
notification_type(
|
||||
before, current, _is_active_alarm,
|
||||
NotifierEventTypes.ACTIVATE_ALARM_EVENT,
|
||||
NotifierEventTypes.DEACTIVATE_ALARM_EVENT),
|
||||
notification_type(
|
||||
before, current, _is_active_causes_edge,
|
||||
NotifierEventTypes.ACTIVATE_CAUSAL_RELATION,
|
||||
NotifierEventTypes.DEACTIVATE_CAUSAL_RELATION),
|
||||
NotifierEventTypes.CHANGE_IN_ALARM_EVENT if
|
||||
_is_alarm_severity_change(before, current) else None,
|
||||
NotifierEventTypes.CHANGE_PROJECT_ID_EVENT if
|
||||
_is_resource_project_id_change(before, current) else None,
|
||||
]
|
||||
return list(filter(None, notification_types))
|
||||
|
||||
|
||||
def notification_type(before,
|
||||
current,
|
||||
is_active,
|
||||
activate_event_type,
|
||||
deactivate_event_type):
|
||||
if not is_active(before):
|
||||
if is_active(current):
|
||||
return activate_event_type
|
||||
else:
|
||||
if not is_active(current):
|
||||
return deactivate_event_type
|
||||
|
||||
|
||||
def _is_active_deduced_alarm(entity):
|
||||
if not entity:
|
||||
return False
|
||||
if vertex.get(VProps.VITRAGE_CATEGORY) == EntityCategory.ALARM and \
|
||||
vertex.get(VProps.VITRAGE_TYPE) == evaluator.VITRAGE_DATASOURCE:
|
||||
return _is_relevant_vertex(vertex)
|
||||
if entity.get(VProps.VITRAGE_CATEGORY) == EntityCategory.ALARM and \
|
||||
entity.get(VProps.VITRAGE_TYPE) == evaluator.VITRAGE_DATASOURCE:
|
||||
return _is_relevant_vertex(entity)
|
||||
return False
|
||||
|
||||
|
||||
def _is_active_alarm(vertex):
|
||||
if vertex and vertex.get(VProps.VITRAGE_CATEGORY) == EntityCategory.ALARM:
|
||||
return _is_relevant_vertex(vertex)
|
||||
def _is_active_alarm(entity):
|
||||
if entity and entity.get(VProps.VITRAGE_CATEGORY) == EntityCategory.ALARM:
|
||||
return _is_relevant_vertex(entity)
|
||||
return False
|
||||
|
||||
|
||||
def _is_marked_down(vertex):
|
||||
if not vertex:
|
||||
def _is_marked_down(entity):
|
||||
if not entity:
|
||||
return False
|
||||
if vertex.get(VProps.VITRAGE_CATEGORY) == EntityCategory.RESOURCE and \
|
||||
vertex.get(VProps.IS_MARKED_DOWN) is True:
|
||||
return _is_relevant_vertex(vertex)
|
||||
if entity.get(VProps.VITRAGE_CATEGORY) == EntityCategory.RESOURCE and \
|
||||
entity.get(VProps.IS_MARKED_DOWN) is True:
|
||||
return _is_relevant_vertex(entity)
|
||||
return False
|
||||
|
||||
|
||||
def _is_relevant_vertex(vertex):
|
||||
if vertex.get(VProps.VITRAGE_IS_DELETED, False) or \
|
||||
vertex.get(VProps.VITRAGE_IS_PLACEHOLDER, False):
|
||||
def _is_relevant_vertex(entity):
|
||||
if entity.get(VProps.VITRAGE_IS_DELETED, False) or \
|
||||
entity.get(VProps.VITRAGE_IS_PLACEHOLDER, False):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _is_active_causes_edge(entity):
|
||||
if not entity:
|
||||
return False
|
||||
if not entity.get(EProps.RELATIONSHIP_TYPE) == ELabel.CAUSES:
|
||||
return False
|
||||
return not entity.get(EProps.VITRAGE_IS_DELETED)
|
||||
|
||||
|
||||
def _is_alarm_severity_change(before, curr):
|
||||
if not (_is_active_alarm(before) and
|
||||
_is_active_alarm(curr)):
|
||||
return False
|
||||
# returns true on activation, deactivation and severity change
|
||||
if not before and curr \
|
||||
or (before.get(VProps.VITRAGE_AGGREGATED_SEVERITY) !=
|
||||
curr.get(VProps.VITRAGE_AGGREGATED_SEVERITY)):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _is_resource_project_id_change(before, curr):
|
||||
if not (_is_active_alarm(before) and
|
||||
_is_active_alarm(curr)):
|
||||
return False
|
||||
if (before.get(VProps.VITRAGE_RESOURCE_PROJECT_ID) !=
|
||||
curr.get(VProps.VITRAGE_RESOURCE_PROJECT_ID)):
|
||||
return True
|
||||
return False
|
||||
|
@ -14,7 +14,7 @@
|
||||
# under the License.
|
||||
from oslo_log import log
|
||||
|
||||
from vitrage.common.constants import EntityCategory
|
||||
from vitrage.common.constants import EntityCategory as ECategory
|
||||
from vitrage.common.constants import GraphAction
|
||||
from vitrage.common.constants import VertexProperties as VProps
|
||||
from vitrage.datasources.transformer_base import TransformerBase
|
||||
@ -318,22 +318,45 @@ class Processor(processor.ProcessorBase):
|
||||
result = self.entity_graph.get_vertices(attr)
|
||||
event[TransformerBase.QUERY_RESULT] = result
|
||||
|
||||
@staticmethod
|
||||
def _add_resource_details_to_alarm(vertex, neighbors):
|
||||
def _add_resource_details_to_alarm(self, vertex, neighbors):
|
||||
|
||||
if not vertex.get(VProps.VITRAGE_CATEGORY) == EntityCategory.ALARM \
|
||||
or not neighbors:
|
||||
if not neighbors:
|
||||
return
|
||||
|
||||
# for the possibility that alarm doesn't have resource
|
||||
vertex.properties[VProps.VITRAGE_RESOURCE_ID] = None
|
||||
vertex.properties[VProps.VITRAGE_RESOURCE_TYPE] = None
|
||||
resource = None
|
||||
alarms = []
|
||||
|
||||
for neighbor in neighbors:
|
||||
if vertex.get(VProps.VITRAGE_CATEGORY) == ECategory.ALARM:
|
||||
alarms = [vertex]
|
||||
for neighbor in neighbors:
|
||||
if neighbor.vertex.get(VProps.VITRAGE_CATEGORY) ==\
|
||||
ECategory.RESOURCE:
|
||||
resource = neighbor.vertex
|
||||
elif vertex.get(VProps.VITRAGE_CATEGORY) == ECategory.RESOURCE:
|
||||
resource = vertex
|
||||
for neighbor in neighbors:
|
||||
if neighbor.vertex.get(VProps.VITRAGE_CATEGORY) == \
|
||||
ECategory.ALARM:
|
||||
alarms.append(neighbor.vertex)
|
||||
|
||||
if neighbor.vertex.get(VProps.VITRAGE_CATEGORY) == \
|
||||
EntityCategory.RESOURCE:
|
||||
vertex.properties[VProps.VITRAGE_RESOURCE_ID] = \
|
||||
neighbor.vertex.vertex_id
|
||||
vertex.properties[VProps.VITRAGE_RESOURCE_TYPE] = \
|
||||
neighbor.vertex.get(VProps.VITRAGE_TYPE)
|
||||
for alarm in alarms:
|
||||
if not resource:
|
||||
self.add_resource_details(alarm, None, None, None)
|
||||
continue
|
||||
|
||||
project_id = resource.get(VProps.PROJECT_ID)
|
||||
if not project_id:
|
||||
n_vertex = self.entity_graph.get_vertex(resource.vertex_id)
|
||||
project_id = n_vertex.get(VProps.PROJECT_ID) \
|
||||
if n_vertex else None
|
||||
self.add_resource_details(
|
||||
alarm,
|
||||
r_id=resource.vertex_id,
|
||||
r_type=resource.get(VProps.VITRAGE_TYPE),
|
||||
r_project_id=project_id)
|
||||
|
||||
@staticmethod
|
||||
def add_resource_details(alarm, r_id, r_type, r_project_id):
|
||||
alarm[VProps.VITRAGE_RESOURCE_ID] = r_id
|
||||
alarm[VProps.VITRAGE_RESOURCE_TYPE] = r_type
|
||||
alarm[VProps.VITRAGE_RESOURCE_PROJECT_ID] = r_project_id
|
||||
|
@ -382,8 +382,8 @@ class ApiWorker(GraphCloneWorkerBase):
|
||||
server=rabbit_hosts)
|
||||
|
||||
endpoints = [TopologyApis(self._entity_graph, conf),
|
||||
AlarmApis(self._entity_graph, conf),
|
||||
RcaApis(self._entity_graph, conf),
|
||||
AlarmApis(self._entity_graph, conf, db),
|
||||
RcaApis(self._entity_graph, conf, db),
|
||||
TemplateApis(notifier, db),
|
||||
EventApis(conf),
|
||||
ResourceApis(self._entity_graph, conf),
|
||||
|
@ -29,6 +29,7 @@ def create_vertex(vitrage_id,
|
||||
entity_state=None,
|
||||
update_timestamp=None,
|
||||
project_id=None,
|
||||
vitrage_resource_project_id=None,
|
||||
metadata=None):
|
||||
"""A builder to create a vertex
|
||||
|
||||
@ -68,7 +69,8 @@ def create_vertex(vitrage_id,
|
||||
VConst.VITRAGE_SAMPLE_TIMESTAMP: vitrage_sample_timestamp,
|
||||
VConst.VITRAGE_IS_PLACEHOLDER: vitrage_is_placeholder,
|
||||
VConst.VITRAGE_ID: vitrage_id,
|
||||
VConst.PROJECT_ID: project_id
|
||||
VConst.PROJECT_ID: project_id,
|
||||
VConst.VITRAGE_RESOURCE_PROJECT_ID: vitrage_resource_project_id,
|
||||
}
|
||||
if metadata:
|
||||
properties.update(metadata)
|
||||
|
@ -18,4 +18,7 @@ OPTS = [
|
||||
cfg.StrOpt('persistor_topic',
|
||||
default='vitrage_persistor',
|
||||
help='persistor will listen on this topic for events to store'),
|
||||
]
|
||||
cfg.IntOpt('alarm_history_ttl',
|
||||
default=30,
|
||||
help='The number of days inactive alarms history is kept'),
|
||||
]
|
||||
|
@ -13,16 +13,27 @@
|
||||
# under the License.
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
from datetime import timedelta
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import cotyledon
|
||||
import dateutil.parser
|
||||
from futurist import periodics
|
||||
|
||||
from oslo_log import log
|
||||
import oslo_messaging as oslo_m
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from vitrage.common.constants import EdgeProperties as EProps
|
||||
from vitrage.common.constants import ElementProperties as ElementProps
|
||||
from vitrage.common.constants import HistoryProps as HProps
|
||||
from vitrage.common.constants import NotifierEventTypes as NETypes
|
||||
from vitrage.common.constants import VertexProperties as VProps
|
||||
from vitrage.common.utils import spawn
|
||||
from vitrage import messaging
|
||||
|
||||
from vitrage.storage.sqlalchemy import models
|
||||
from vitrage.utils.datetime import utcnow
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
@ -58,16 +69,94 @@ class PersistorService(cotyledon.Service):
|
||||
|
||||
|
||||
class VitragePersistorEndpoint(object):
|
||||
|
||||
funcs = {}
|
||||
|
||||
def __init__(self, db_connection):
|
||||
self.db_connection = db_connection
|
||||
self.db = db_connection
|
||||
self.event_type_to_writer = {
|
||||
NETypes.ACTIVATE_ALARM_EVENT: self._persist_activated_alarm,
|
||||
NETypes.DEACTIVATE_ALARM_EVENT: self._persist_deactivate_alarm,
|
||||
NETypes.ACTIVATE_CAUSAL_RELATION: self._persist_activate_edge,
|
||||
NETypes.DEACTIVATE_CAUSAL_RELATION: self._persist_deactivate_edge,
|
||||
NETypes.CHANGE_IN_ALARM_EVENT: self._persist_change,
|
||||
NETypes.CHANGE_PROJECT_ID_EVENT: self._persist_alarm_proj_change,
|
||||
}
|
||||
|
||||
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||
LOG.debug('Event_type: %s Payload %s', event_type, payload)
|
||||
if event_type and event_type in self.funcs.keys():
|
||||
self.funcs[event_type](self.db_connection, event_type, payload)
|
||||
self.process_event(event_type, payload)
|
||||
|
||||
def process_event(self, event_type, payload):
|
||||
writer = self.event_type_to_writer.get(event_type)
|
||||
if not writer:
|
||||
LOG.warning('Unrecognized event_type: %s', event_type)
|
||||
return
|
||||
writer(event_type, payload)
|
||||
|
||||
def _persist_activated_alarm(self, event_type, data):
|
||||
event_timestamp = self.event_time(data)
|
||||
|
||||
alarm_row = \
|
||||
models.Alarm(
|
||||
vitrage_id=data.get(VProps.VITRAGE_ID),
|
||||
start_timestamp=event_timestamp,
|
||||
name=data.get(VProps.NAME),
|
||||
vitrage_type=data.get(VProps.VITRAGE_TYPE),
|
||||
vitrage_aggregated_severity=data.get(
|
||||
VProps.VITRAGE_AGGREGATED_SEVERITY),
|
||||
project_id=data.get(VProps.PROJECT_ID),
|
||||
vitrage_resource_type=data.get(VProps.VITRAGE_RESOURCE_TYPE),
|
||||
vitrage_resource_id=data.get(VProps.VITRAGE_RESOURCE_ID),
|
||||
vitrage_resource_project_id=data.get(
|
||||
VProps.VITRAGE_RESOURCE_PROJECT_ID),
|
||||
payload=data)
|
||||
self.db.alarms.create(alarm_row)
|
||||
|
||||
def _persist_deactivate_alarm(self, event_type, data):
|
||||
vitrage_id = data.get(VProps.VITRAGE_ID)
|
||||
event_timestamp = self.event_time(data)
|
||||
self.db.alarms.update(
|
||||
vitrage_id, HProps.END_TIMESTAMP, event_timestamp)
|
||||
|
||||
def _persist_alarm_proj_change(self, event_type, data):
|
||||
vitrage_id = data.get(VProps.VITRAGE_ID)
|
||||
self.db.alarms.update(vitrage_id,
|
||||
VProps.VITRAGE_RESOURCE_PROJECT_ID,
|
||||
data.get(VProps.VITRAGE_RESOURCE_PROJECT_ID))
|
||||
|
||||
def _persist_activate_edge(self, event_type, data):
|
||||
event_timestamp = self.event_time(data)
|
||||
|
||||
edge_row = \
|
||||
models.Edge(
|
||||
source_id=data.get(EProps.SOURCE_ID),
|
||||
target_id=data.get(EProps.TARGET_ID),
|
||||
label=data.get(EProps.RELATIONSHIP_TYPE),
|
||||
start_timestamp=event_timestamp,
|
||||
payload=data)
|
||||
self.db.edges.create(edge_row)
|
||||
|
||||
def _persist_deactivate_edge(self, event_type, data):
|
||||
event_timestamp = self.event_time(data)
|
||||
source_id = data.get(EProps.SOURCE_ID)
|
||||
target_id = data.get(EProps.TARGET_ID)
|
||||
self.db.edges.update(
|
||||
source_id, target_id, end_timestamp=event_timestamp)
|
||||
|
||||
def _persist_change(self, event_type, data):
|
||||
event_timestamp = self.event_time(data)
|
||||
change_row = \
|
||||
models.Change(
|
||||
vitrage_id=data.get(VProps.VITRAGE_ID),
|
||||
timestamp=event_timestamp,
|
||||
severity=data.get(VProps.VITRAGE_AGGREGATED_SEVERITY),
|
||||
payload=data)
|
||||
self.db.changes.create(change_row)
|
||||
|
||||
@staticmethod
|
||||
def event_time(data):
|
||||
event_timestamp = \
|
||||
dateutil.parser.parse(data.get(ElementProps.UPDATE_TIMESTAMP))
|
||||
event_timestamp = timeutils.normalize_time(event_timestamp)
|
||||
return event_timestamp
|
||||
|
||||
|
||||
class Scheduler(object):
|
||||
@ -81,10 +170,11 @@ class Scheduler(object):
|
||||
self.periodic = periodics.PeriodicWorker.create(
|
||||
[], executor_factory=lambda: ThreadPoolExecutor(max_workers=10))
|
||||
|
||||
self.add_expirer_timer()
|
||||
self.add_events_table_expirer_timer()
|
||||
self.add_history_tables_expirer_timer()
|
||||
spawn(self.periodic.start)
|
||||
|
||||
def add_expirer_timer(self):
|
||||
def add_events_table_expirer_timer(self):
|
||||
spacing = 60
|
||||
|
||||
@periodics.periodic(spacing=spacing)
|
||||
@ -92,11 +182,27 @@ class Scheduler(object):
|
||||
try:
|
||||
event_id = self.db.graph_snapshots.query_snapshot_event_id()
|
||||
if event_id:
|
||||
LOG.debug('Expirer deleting event - id=%s', event_id)
|
||||
LOG.debug('Table events - deleting event id=%s', event_id)
|
||||
self.db.events.delete(event_id)
|
||||
|
||||
except Exception:
|
||||
LOG.exception('DB periodic cleanup run failed.')
|
||||
LOG.exception('Table events - periodic cleanup run failed.')
|
||||
|
||||
self.periodic.add(expirer_periodic)
|
||||
LOG.info("Database periodic cleanup starting (spacing=%ss)", spacing)
|
||||
LOG.info("Table events - periodic cleanup started (%ss)", spacing)
|
||||
|
||||
def add_history_tables_expirer_timer(self):
|
||||
spacing = 60
|
||||
|
||||
@periodics.periodic(spacing=spacing)
|
||||
def expirer_periodic():
|
||||
expire_by = \
|
||||
utcnow(with_timezone=False) - \
|
||||
timedelta(days=self.conf.persistency.alarm_history_ttl)
|
||||
try:
|
||||
self.db.alarms.delete_expired(expire_by)
|
||||
except Exception:
|
||||
LOG.exception('History tables - periodic cleanup run failed.')
|
||||
|
||||
self.periodic.add(expirer_periodic)
|
||||
LOG.info("History tables - periodic cleanup started (%ss)", spacing)
|
||||
|
@ -17,6 +17,8 @@ import six.moves.urllib.parse as urlparse
|
||||
from stevedore import driver
|
||||
import tenacity
|
||||
|
||||
from vitrage.utils.datetime import utcnow
|
||||
|
||||
_NAMESPACE = 'vitrage.storage'
|
||||
|
||||
|
||||
@ -50,3 +52,8 @@ def get_connection_from_config(conf):
|
||||
return mgr.driver(conf, url)
|
||||
|
||||
return _get_connection()
|
||||
|
||||
|
||||
def db_time():
|
||||
ret = utcnow(with_timezone=False)
|
||||
return ret.replace(microsecond=0)
|
||||
|
@ -43,6 +43,18 @@ class Connection(object):
|
||||
def webhooks(self):
|
||||
return None
|
||||
|
||||
@property
|
||||
def alarms(self):
|
||||
return None
|
||||
|
||||
@property
|
||||
def edges(self):
|
||||
return None
|
||||
|
||||
@property
|
||||
def changes(self):
|
||||
return None
|
||||
|
||||
@abc.abstractmethod
|
||||
def upgrade(self, nocreate=False):
|
||||
raise NotImplementedError('upgrade is not implemented')
|
||||
@ -231,6 +243,48 @@ class GraphSnapshotsConnection(object):
|
||||
"""
|
||||
raise NotImplementedError('query graph snapshot not implemented')
|
||||
|
||||
def delete(self, timestamp=None):
|
||||
def delete(self):
|
||||
"""Delete all graph snapshots taken until timestamp."""
|
||||
raise NotImplementedError('delete graph snapshots not implemented')
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class AlarmsConnection(object):
|
||||
def create(self, alarm):
|
||||
raise NotImplementedError('create alarm not implemented')
|
||||
|
||||
def update(self, vitrage_id, key, val):
|
||||
raise NotImplementedError('update alarms not implemented')
|
||||
|
||||
def end_all_alarms(self, end_time):
|
||||
raise NotImplementedError('end all alarms not implemented')
|
||||
|
||||
def delete(self, timestamp=None):
|
||||
raise NotImplementedError('delete alarms not implemented')
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class EdgesConnection(object):
|
||||
def create(self, edge):
|
||||
raise NotImplementedError('create edge not implemented')
|
||||
|
||||
def update(self, source_id, target_id, timestamp):
|
||||
raise NotImplementedError('update edge not implemented')
|
||||
|
||||
def end_all_edges(self, end_time):
|
||||
raise NotImplementedError('end all edges not implemented')
|
||||
|
||||
def delete(self):
|
||||
raise NotImplementedError('delete edges not implemented')
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class ChangesConnection(object):
|
||||
def create(self, change):
|
||||
raise NotImplementedError('create change not implemented')
|
||||
|
||||
def add_end_changes(self, chnges_to_add, end_time):
|
||||
raise NotImplementedError('add end changes not implemented')
|
||||
|
||||
def delete(self):
|
||||
raise NotImplementedError('delete changes not implemented')
|
||||
|
435
vitrage/storage/history_facade.py
Normal file
435
vitrage/storage/history_facade.py
Normal file
@ -0,0 +1,435 @@
|
||||
# Copyright 2018 - Nokia
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
from oslo_db.sqlalchemy import utils as sqlalchemyutils
|
||||
from oslo_log import log
|
||||
from oslo_utils import timeutils
|
||||
import sqlalchemy
|
||||
from sqlalchemy import and_
|
||||
from sqlalchemy import or_
|
||||
from vitrage.common.constants import EdgeLabel as ELable
|
||||
from vitrage.common.constants import HistoryProps as HProps
|
||||
from vitrage.common.exception import VitrageInputError
|
||||
from vitrage.entity_graph.mappings.operational_alarm_severity import \
|
||||
OperationalAlarmSeverity as OSeverity
|
||||
from vitrage.storage import db_time
|
||||
from vitrage.storage.sqlalchemy import models
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
LIMIT = 10000
|
||||
ASC = 'asc'
|
||||
DESC = 'desc'
|
||||
|
||||
|
||||
class HistoryFacadeConnection(object):
|
||||
def __init__(self, engine_facade, alarms, edges, changes):
|
||||
self._engine_facade = engine_facade
|
||||
self._alarms = alarms
|
||||
self._edges = edges
|
||||
self._changes = changes
|
||||
|
||||
def disable_alarms_in_history(self):
|
||||
end_time = db_time()
|
||||
active_alarms = self.get_alarms(limit=0)
|
||||
changes_to_add = [alarm.vitrage_id for alarm in active_alarms]
|
||||
self._alarms.end_all_alarms(end_time)
|
||||
self._edges.end_all_edges(end_time)
|
||||
self._changes.add_end_changes(changes_to_add, end_time)
|
||||
|
||||
def count_active_alarms(self, project_id=None, is_admin_project=False):
|
||||
|
||||
session = self._engine_facade.get_session()
|
||||
query = session.query(models.Alarm)
|
||||
query = query.filter(models.Alarm.end_timestamp > db_time())
|
||||
query = self._add_project_filtering_to_query(
|
||||
query, project_id, is_admin_project)
|
||||
|
||||
query_severe = query.filter(
|
||||
models.Alarm.vitrage_aggregated_severity == OSeverity.SEVERE)
|
||||
query_critical = query.filter(
|
||||
models.Alarm.vitrage_aggregated_severity == OSeverity.CRITICAL)
|
||||
query_warning = query.filter(
|
||||
models.Alarm.vitrage_aggregated_severity == OSeverity.WARNING)
|
||||
query_ok = query.filter(
|
||||
models.Alarm.vitrage_aggregated_severity == OSeverity.OK)
|
||||
query_na = query.filter(
|
||||
models.Alarm.vitrage_aggregated_severity == OSeverity.NA)
|
||||
|
||||
counts = {OSeverity.SEVERE: query_severe.count(),
|
||||
OSeverity.CRITICAL: query_critical.count(),
|
||||
OSeverity.WARNING: query_warning.count(),
|
||||
OSeverity.OK: query_ok.count(),
|
||||
OSeverity.NA: query_na.count()}
|
||||
|
||||
return counts
|
||||
|
||||
def get_alarms(self,
|
||||
start=None,
|
||||
end=None,
|
||||
limit=LIMIT,
|
||||
sort_by=(HProps.START_TIMESTAMP, HProps.VITRAGE_ID),
|
||||
sort_dirs=(ASC, ASC),
|
||||
filter_by=None,
|
||||
filter_vals=None,
|
||||
next_page=True,
|
||||
marker=None,
|
||||
only_active_alarms=False,
|
||||
project_id=None,
|
||||
is_admin_project=False):
|
||||
"""Return alarms that match all filters sorted by the given keys.
|
||||
|
||||
Deleted alarms will be returned when only_active_alarms=False.
|
||||
|
||||
filtering and sorting are possible on each row of alarms table
|
||||
(pay attantion: it is not recommended to filter by start_timestamp
|
||||
and end_timestamp when start or end arguments are passed):
|
||||
vitrage_id,
|
||||
start_timestamp,
|
||||
end_timestamp,
|
||||
name,
|
||||
vitrage_type,
|
||||
vitrage_aggregated_severity,
|
||||
project_id,
|
||||
vitrage_resource_type,
|
||||
vitrage_resource_id,
|
||||
vitrage_resource_project_id,
|
||||
payload
|
||||
|
||||
Time Frame:
|
||||
start and end arguments gives the time frame for required alarms.
|
||||
Required format is the format that can be parsed by timeutils library.
|
||||
If both arguments are given, returned alarms are the alarms that
|
||||
where active sometime during given time frame
|
||||
(including active and inactive alarms):
|
||||
|
||||
1. start_ts------------end_ts
|
||||
2. start_ts------------end_ts
|
||||
3. start_ts------------end_ts
|
||||
4. start_ts---------------------------------------end_ts
|
||||
start end
|
||||
|_______________________________|
|
||||
|
||||
If only start is given, all alarms that started after this time
|
||||
will be returned (including active and inactive alarms):
|
||||
1. start_ts------------end_ts
|
||||
2. start_ts------
|
||||
start now
|
||||
|_______________________________|
|
||||
|
||||
note1: end argument can't be used without start argument
|
||||
note2: time frame can't be used with flag only_active_alarms=True
|
||||
|
||||
Filtering:
|
||||
filter_by represents parameters to filter on,
|
||||
and filter_vals contains the values to filter on in corresponding
|
||||
order to the order of parameters in filter_by.
|
||||
The filtering is according to SQL 'like' statement.
|
||||
It's possible to filter on each row of alarms table
|
||||
The filtering is also possible on list of values.
|
||||
|
||||
examples:
|
||||
1. In the following example:
|
||||
| filter_by = ['vitrage_type', 'vitrage_resource_type']
|
||||
| filter_vals = ['zabbix', 'nova']
|
||||
which will be evaluated to:
|
||||
Alarm.vitrage_type like '%zabbix%'
|
||||
and Alarm.vitrage_resource_type like '%nova%'
|
||||
Tthe filtering will be done so the query returns all the alarms
|
||||
in the DB with vitrage type containing the string 'zabbix'
|
||||
and vitrage resource type containing the string 'nova'
|
||||
|
||||
2. Following example is filtering list of values for one same property:
|
||||
| filter_by = ['vitrage_type', 'vitrage_id']
|
||||
| filter_vals = ['zabbix', ['123', '456', '789']]
|
||||
It will be evaluated to:
|
||||
Alarm.vitrage_type like '%zabbix%'
|
||||
and Alarm.vitrage_resource_type like '%123%'
|
||||
or like '%456%'
|
||||
or like '%789%'
|
||||
Tthe filtering will be done so the query returns all the alarms
|
||||
in the DB with vitrage type containing the string 'zabbix'
|
||||
and with one of vitrage_ids that are in the list in filter_vals[1]
|
||||
|
||||
|
||||
:param start: start of time frame
|
||||
:param end: end of time frame
|
||||
:param limit: maximum number of items to return,
|
||||
if limit=0 the method will return all matched items in alarms table,
|
||||
if limit is bigger then default parameter LIMIT, the number of items
|
||||
that will be returned will be defined by the default parameter LIMIT
|
||||
:param sort_by: array of attributes by which results should be sorted
|
||||
:param sort_dirs: per-column array of sort_dirs,
|
||||
corresponding to sort_keys ('asc' or 'desc').
|
||||
:param filter_by: array of attributes by which results will be filtered
|
||||
:param filter_vals: per-column array of filter values
|
||||
corresponding to filter_by
|
||||
:param next_page: if True will return next page when marker is given,
|
||||
if False will return previous page when marker is given,
|
||||
otherwise, returns first page if no marker was given.
|
||||
:param marker: if None returns first page, else if vitrage_id is given
|
||||
and next_page is True, return next #limit results after marker,
|
||||
else, if next page is False,return #limit results before marker.
|
||||
:param only_active_alarms: if True, returns only active alarms,
|
||||
if False return active and non-active alarms.
|
||||
:param project_id: if None there is no filtering by project_id
|
||||
(equals to All Tenants=True),
|
||||
if id is given, query will be fillter alarms by project id.
|
||||
:param is_admin_project: True to return alarms with
|
||||
project_id=None or resource_project_id=None
|
||||
"""
|
||||
|
||||
session = self._engine_facade.get_session()
|
||||
query = session.query(models.Alarm)
|
||||
query = self._add_project_filtering_to_query(
|
||||
query, project_id, is_admin_project)
|
||||
|
||||
self.assert_args(start, end, filter_by, filter_vals,
|
||||
only_active_alarms, sort_dirs)
|
||||
|
||||
if only_active_alarms:
|
||||
query = query.filter(models.Alarm.end_timestamp > db_time())
|
||||
elif (start and end) or start:
|
||||
query = self._add_time_frame_to_query(query, start, end)
|
||||
|
||||
query = self._add_filtering_to_query(query, filter_by, filter_vals)
|
||||
|
||||
if limit:
|
||||
query = self._generate_alarms_paginate_query(query,
|
||||
limit,
|
||||
sort_by,
|
||||
sort_dirs,
|
||||
next_page,
|
||||
marker)
|
||||
elif limit == 0:
|
||||
sort_dir_func = {
|
||||
ASC: sqlalchemy.asc,
|
||||
DESC: sqlalchemy.desc,
|
||||
}
|
||||
for i in range(len(sort_by)):
|
||||
query.order_by(sort_dir_func[sort_dirs[i]](
|
||||
getattr(models.Alarm, sort_by[i])))
|
||||
return query.all()
|
||||
|
||||
@staticmethod
|
||||
def assert_args(start,
|
||||
end,
|
||||
filter_by,
|
||||
filter_vals,
|
||||
only_active_alarms,
|
||||
sort_dirs):
|
||||
if only_active_alarms and (start or end):
|
||||
raise VitrageInputError("'only_active_alarms' can't be used "
|
||||
"with 'start' or 'end' ")
|
||||
if end and not start:
|
||||
raise VitrageInputError("'end' can't be used without 'start'")
|
||||
if (filter_by and not filter_vals) or (filter_vals and not filter_by):
|
||||
raise VitrageInputError('Cannot perform filtering, one of '
|
||||
'filter_by or filter_vals are missing')
|
||||
if filter_by and filter_vals and len(filter_by) != len(filter_vals):
|
||||
raise VitrageInputError("Cannot perform filtering, len of "
|
||||
"'filter_by' and 'filter_vals' differs")
|
||||
for d in sort_dirs:
|
||||
if d not in (ASC, DESC):
|
||||
raise VitrageInputError("Unknown sort direction %s", str(d))
|
||||
|
||||
@staticmethod
|
||||
def _add_time_frame_to_query(query, start, end):
|
||||
start = timeutils.normalize_time(start)
|
||||
end = timeutils.normalize_time(end)
|
||||
if start and end:
|
||||
query = \
|
||||
query.filter(
|
||||
or_(and_(models.Alarm.start_timestamp >= start,
|
||||
models.Alarm.start_timestamp <= end),
|
||||
and_(models.Alarm.end_timestamp >= start,
|
||||
models.Alarm.end_timestamp <= end),
|
||||
and_(models.Alarm.start_timestamp <= start,
|
||||
models.Alarm.end_timestamp >= end)))
|
||||
elif start:
|
||||
query = query.filter(models.Alarm.end_timestamp >= start)
|
||||
return query
|
||||
|
||||
@staticmethod
|
||||
def _add_project_filtering_to_query(query, project_id=None,
|
||||
is_admin_project=False):
|
||||
|
||||
if project_id:
|
||||
if is_admin_project:
|
||||
query = query.filter(or_(
|
||||
or_(models.Alarm.project_id == project_id,
|
||||
models.Alarm.vitrage_resource_project_id ==
|
||||
project_id),
|
||||
and_(
|
||||
or_(
|
||||
models.Alarm.project_id == project_id,
|
||||
models.Alarm.project_id == None),
|
||||
or_(
|
||||
models.Alarm.vitrage_resource_project_id ==
|
||||
project_id,
|
||||
models.Alarm.vitrage_resource_project_id == None)
|
||||
))) # noqa
|
||||
else:
|
||||
query = query.filter(
|
||||
or_(models.Alarm.project_id == project_id,
|
||||
models.Alarm.vitrage_resource_project_id ==
|
||||
project_id))
|
||||
return query
|
||||
|
||||
@staticmethod
|
||||
def _add_filtering_to_query(query, filter_by, filter_vals):
|
||||
|
||||
if not (filter_by or filter_vals):
|
||||
return query
|
||||
|
||||
for i in range(len(filter_by)):
|
||||
key = filter_by[i]
|
||||
val = filter_vals[i]
|
||||
val = val if val and type(val) == list else [val]
|
||||
cond = or_(*[getattr(models.Alarm, key).like(
|
||||
'%' + val[j] + '%') for j in range(len(val))])
|
||||
query = query.filter(cond)
|
||||
return query
|
||||
|
||||
def _generate_alarms_paginate_query(self,
|
||||
query,
|
||||
limit,
|
||||
sort_by,
|
||||
sort_dirs,
|
||||
next_page,
|
||||
marker):
|
||||
|
||||
limit = min(int(limit), LIMIT)
|
||||
|
||||
if marker:
|
||||
session = self._engine_facade.get_session()
|
||||
marker = session.query(models.Alarm). \
|
||||
filter(models.Alarm.vitrage_id ==
|
||||
marker).first()
|
||||
|
||||
if HProps.VITRAGE_ID not in sort_by:
|
||||
sort_by.append(HProps.VITRAGE_ID)
|
||||
sort_dirs.append(ASC)
|
||||
|
||||
if not next_page and marker: # 'not next_page' means previous page
|
||||
marker = self._create_marker_for_prev(
|
||||
query, limit, sort_by, sort_dirs, marker)
|
||||
|
||||
query = sqlalchemyutils.paginate_query(query,
|
||||
models.Alarm,
|
||||
limit,
|
||||
sort_by,
|
||||
sort_dirs=sort_dirs,
|
||||
marker=marker)
|
||||
return query
|
||||
|
||||
@staticmethod
|
||||
def _create_marker_for_prev(query, limit, sort_by, sort_dirs, marker):
|
||||
|
||||
dirs = [DESC if d == ASC else ASC for d in sort_dirs]
|
||||
query = sqlalchemyutils.paginate_query(query,
|
||||
models.Alarm,
|
||||
limit + 1,
|
||||
sort_by,
|
||||
marker=marker,
|
||||
sort_dirs=dirs)
|
||||
|
||||
alarms = query.all()
|
||||
if len(alarms) < limit + 1:
|
||||
new_marker = None
|
||||
else:
|
||||
new_marker = alarms[-1]
|
||||
|
||||
return new_marker
|
||||
|
||||
def alarm_rca(self,
|
||||
alarm_id,
|
||||
forward=True,
|
||||
backward=True,
|
||||
depth=None,
|
||||
project_id=None,
|
||||
admin=False):
|
||||
|
||||
n_result_f = []
|
||||
e_result_f = []
|
||||
if forward:
|
||||
n_result_f, e_result_f = \
|
||||
self._bfs(alarm_id, self._out_rca, depth, admin=admin,
|
||||
project_id=project_id)
|
||||
|
||||
n_result_b = []
|
||||
e_result_b = []
|
||||
if backward:
|
||||
n_result_b, e_result_b = \
|
||||
self._bfs(alarm_id, self._in_rca, depth, admin=admin,
|
||||
project_id=project_id)
|
||||
|
||||
n_result = self.get_alarms(limit=0,
|
||||
filter_by=[HProps.VITRAGE_ID],
|
||||
filter_vals=[n_result_f + n_result_b])
|
||||
|
||||
e_result = e_result_f + e_result_b
|
||||
|
||||
return n_result, e_result
|
||||
|
||||
def _rca_edges(self, filter_by, a_ids, proj_id, admin):
|
||||
alarm_ids = [str(alarm) for alarm in a_ids]
|
||||
session = self._engine_facade.get_session()
|
||||
query = session.query(models.Edge)\
|
||||
.filter(and_(getattr(models.Edge, filter_by).in_(alarm_ids),
|
||||
models.Edge.label == ELable.CAUSES))
|
||||
|
||||
query = query.join(models.Edge.target)
|
||||
query = self._add_project_filtering_to_query(query, proj_id, admin)
|
||||
|
||||
return query.all()
|
||||
|
||||
def _out_rca(self, sources, proj_id, admin):
|
||||
return self._rca_edges(HProps.SOURCE_ID, sources, proj_id, admin)
|
||||
|
||||
def _in_rca(self, targets, proj_id, admin):
|
||||
return self._rca_edges(HProps.TARGET_ID, targets, proj_id, admin)
|
||||
|
||||
def _bfs(self, alarm_id, neighbors_func,
|
||||
depth=None,
|
||||
project_id=None,
|
||||
admin=False):
|
||||
n_result = []
|
||||
visited_nodes = set()
|
||||
n_result.append(alarm_id)
|
||||
e_result = []
|
||||
curr_depth = 0
|
||||
nodes_q = {curr_depth: [alarm_id]}
|
||||
while nodes_q:
|
||||
node_ids = nodes_q.pop(curr_depth)
|
||||
if depth and curr_depth >= depth:
|
||||
break
|
||||
for node_id in node_ids:
|
||||
if node_id in visited_nodes:
|
||||
node_ids.remove(node_id)
|
||||
visited_nodes.update(node_ids)
|
||||
e_list = neighbors_func(node_ids, project_id, admin)
|
||||
n_list = \
|
||||
[edge.target_id if edge.source_id in node_ids
|
||||
else edge.source_id for edge in e_list]
|
||||
n_result.extend(n_list)
|
||||
e_result.extend(e_list)
|
||||
if n_list:
|
||||
curr_depth += 1
|
||||
nodes_q[curr_depth] = n_list
|
||||
|
||||
return n_result, e_result
|
@ -17,12 +17,17 @@ from __future__ import absolute_import
|
||||
|
||||
from oslo_db.sqlalchemy import session as db_session
|
||||
from oslo_log import log
|
||||
from sqlalchemy import and_
|
||||
from sqlalchemy.engine import url as sqlalchemy_url
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy import or_
|
||||
|
||||
from vitrage.common.exception import VitrageInputError
|
||||
from vitrage.entity_graph.mappings.operational_alarm_severity import \
|
||||
OperationalAlarmSeverity
|
||||
from vitrage import storage
|
||||
from vitrage.storage import base
|
||||
from vitrage.storage.history_facade import HistoryFacadeConnection
|
||||
from vitrage.storage.sqlalchemy import models
|
||||
from vitrage.storage.sqlalchemy.models import Template
|
||||
|
||||
@ -47,6 +52,14 @@ class Connection(base.Connection):
|
||||
self._graph_snapshots = GraphSnapshotsConnection(self._engine_facade)
|
||||
self._webhooks = WebhooksConnection(
|
||||
self._engine_facade)
|
||||
self._alarms = AlarmsConnection(
|
||||
self._engine_facade)
|
||||
self._edges = EdgesConnection(
|
||||
self._engine_facade)
|
||||
self._changes = ChangesConnection(
|
||||
self._engine_facade)
|
||||
self._history_facade = HistoryFacadeConnection(
|
||||
self._engine_facade, self._alarms, self._edges, self._changes)
|
||||
|
||||
@property
|
||||
def webhooks(self):
|
||||
@ -68,6 +81,22 @@ class Connection(base.Connection):
|
||||
def graph_snapshots(self):
|
||||
return self._graph_snapshots
|
||||
|
||||
@property
|
||||
def alarms(self):
|
||||
return self._alarms
|
||||
|
||||
@property
|
||||
def edges(self):
|
||||
return self._edges
|
||||
|
||||
@property
|
||||
def changes(self):
|
||||
return self._changes
|
||||
|
||||
@property
|
||||
def history_facade(self):
|
||||
return self._history_facade
|
||||
|
||||
@staticmethod
|
||||
def _dress_url(url):
|
||||
# If no explicit driver has been set, we default to pymysql
|
||||
@ -97,7 +126,10 @@ class Connection(base.Connection):
|
||||
models.Template.__table__,
|
||||
models.Webhooks.__table__,
|
||||
models.Event.__table__,
|
||||
models.GraphSnapshot.__table__])
|
||||
models.GraphSnapshot.__table__,
|
||||
models.Alarm.__table__,
|
||||
models.Edge.__table__,
|
||||
models.Change.__table__])
|
||||
# TODO(idan_hefetz) upgrade logic is missing
|
||||
|
||||
def disconnect(self):
|
||||
@ -359,3 +391,112 @@ class GraphSnapshotsConnection(base.GraphSnapshotsConnection, BaseTableConn):
|
||||
"""Delete all graph snapshots"""
|
||||
query = self.query_filter(models.GraphSnapshot)
|
||||
query.delete()
|
||||
|
||||
|
||||
class AlarmsConnection(base.AlarmsConnection, BaseTableConn):
|
||||
def __init__(self, engine_facade):
|
||||
super(AlarmsConnection, self).__init__(engine_facade)
|
||||
|
||||
def create(self, alarm):
|
||||
session = self._engine_facade.get_session()
|
||||
with session.begin():
|
||||
session.add(alarm)
|
||||
|
||||
def update(self, vitrage_id, key, val):
|
||||
session = self._engine_facade.get_session()
|
||||
with session.begin():
|
||||
query = session.query(models.Alarm).filter(
|
||||
models.Alarm.vitrage_id == vitrage_id)
|
||||
query.update({getattr(models.Alarm, key): val})
|
||||
|
||||
def end_all_alarms(self, end_time):
|
||||
session = self._engine_facade.get_session()
|
||||
query = session.query(models.Alarm).filter(
|
||||
models.Alarm.end_timestamp > end_time)
|
||||
query.update({models.Alarm.end_timestamp: end_time})
|
||||
|
||||
def delete_expired(self, expire_by=None):
|
||||
session = self._engine_facade.get_session()
|
||||
query = session.query(models.Alarm)
|
||||
query = query.filter(models.Alarm.end_timestamp < expire_by)
|
||||
return query.delete()
|
||||
|
||||
def delete(self,
|
||||
vitrage_id=None,
|
||||
start_timestamp=None,
|
||||
end_timestamp=None):
|
||||
query = self.query_filter(
|
||||
models.Alarm,
|
||||
vitrage_id=vitrage_id,
|
||||
start_timestamp=start_timestamp,
|
||||
end_timestamp=end_timestamp)
|
||||
return query.delete()
|
||||
|
||||
|
||||
class EdgesConnection(base.EdgesConnection, BaseTableConn):
|
||||
def __init__(self, engine_facade):
|
||||
super(EdgesConnection, self).__init__(engine_facade)
|
||||
|
||||
def create(self, edge):
|
||||
session = self._engine_facade.get_session()
|
||||
with session.begin():
|
||||
session.add(edge)
|
||||
|
||||
def update(self, source_id, target_id, end_timestamp):
|
||||
session = self._engine_facade.get_session()
|
||||
with session.begin():
|
||||
query = session.query(models.Edge).filter(and_(
|
||||
models.Edge.source_id == source_id,
|
||||
models.Edge.target_id == target_id))
|
||||
query.update({models.Edge.end_timestamp: end_timestamp})
|
||||
|
||||
def end_all_edges(self, end_time):
|
||||
session = self._engine_facade.get_session()
|
||||
query = session.query(models.Edge).filter(
|
||||
models.Edge.end_timestamp > end_time)
|
||||
query.update({models.Edge.end_timestamp: end_time})
|
||||
|
||||
def delete(self):
|
||||
query = self.query_filter(models.Edge)
|
||||
return query.delete()
|
||||
|
||||
|
||||
class ChangesConnection(base.ChangesConnection, BaseTableConn):
|
||||
def __init__(self, engine_facade):
|
||||
super(ChangesConnection, self).__init__(engine_facade)
|
||||
|
||||
def create(self, change):
|
||||
session = self._engine_facade.get_session()
|
||||
with session.begin():
|
||||
session.add(change)
|
||||
|
||||
def add_end_changes(self, vitrage_ids, end_time):
|
||||
last_changes = self._get_alarms_last_change(vitrage_ids)
|
||||
for id, change in last_changes.items():
|
||||
change_row = \
|
||||
models.Change(
|
||||
vitrage_id=id,
|
||||
timestamp=end_time,
|
||||
severity=OperationalAlarmSeverity.OK,
|
||||
payload=change.payload)
|
||||
self.create(change_row)
|
||||
|
||||
def _get_alarms_last_change(self, alarm_ids):
|
||||
session = self._engine_facade.get_session()
|
||||
query = session.query(func.max(models.Change.timestamp),
|
||||
models.Change.vitrage_id,
|
||||
models.Change.payload).\
|
||||
filter(models.Change.vitrage_id.in_(alarm_ids)).\
|
||||
group_by(models.Change.vitrage_id)
|
||||
|
||||
rows = query.all()
|
||||
|
||||
result = {}
|
||||
for change in rows:
|
||||
result[change.vitrage_id] = change
|
||||
|
||||
return result
|
||||
|
||||
def delete(self):
|
||||
query = self.query_filter(models.Change)
|
||||
return query.delete()
|
||||
|
@ -11,18 +11,22 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import datetime
|
||||
import json
|
||||
import zlib
|
||||
|
||||
from oslo_db.sqlalchemy import models
|
||||
|
||||
from sqlalchemy import Column, INTEGER, String, \
|
||||
SmallInteger, BigInteger, Index, Boolean
|
||||
from sqlalchemy import Column, DateTime, INTEGER, String, \
|
||||
SmallInteger, BigInteger, Index, Boolean, ForeignKey
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
|
||||
from sqlalchemy.orm import relationship
|
||||
import sqlalchemy.types as types
|
||||
|
||||
|
||||
DEFAULT_END_TIME = datetime.datetime(2222, 2, 22, 22, 22, 22)
|
||||
|
||||
|
||||
class VitrageBase(models.TimestampMixin, models.ModelBase):
|
||||
"""Base class for Vitrage Models."""
|
||||
__table_args__ = {'mysql_charset': "utf8",
|
||||
@ -44,6 +48,22 @@ class VitrageBase(models.TimestampMixin, models.ModelBase):
|
||||
Base = declarative_base(cls=VitrageBase)
|
||||
|
||||
|
||||
class AutoIncrementInteger(types.TypeDecorator):
|
||||
impl = types.INT
|
||||
count = 0
|
||||
|
||||
def process_bind_param(self, value, dialect):
|
||||
value = self.count
|
||||
self.count += 1
|
||||
return value
|
||||
|
||||
def process_result_value(self, value, dialect):
|
||||
return value
|
||||
|
||||
|
||||
MagicBigInt = types.BigInteger().with_variant(AutoIncrementInteger, 'sqlite')
|
||||
|
||||
|
||||
class JSONEncodedDict(types.TypeDecorator):
|
||||
"""Represents an immutable structure as a json-encoded string"""
|
||||
|
||||
@ -204,3 +224,118 @@ class Webhooks(Base):
|
||||
self.headers,
|
||||
self.regex_filter
|
||||
)
|
||||
|
||||
|
||||
class Alarm(Base):
|
||||
|
||||
__tablename__ = 'alarms'
|
||||
|
||||
vitrage_id = Column(String(128), primary_key=True)
|
||||
start_timestamp = Column(DateTime, index=True, nullable=False)
|
||||
end_timestamp = Column(DateTime, index=True, nullable=False,
|
||||
default=DEFAULT_END_TIME)
|
||||
name = Column(String(256), nullable=False)
|
||||
vitrage_type = Column(String(64), nullable=False)
|
||||
vitrage_aggregated_severity = Column(String(64), index=True,
|
||||
nullable=False)
|
||||
project_id = Column(String(64), index=True)
|
||||
vitrage_resource_type = Column(String(64))
|
||||
vitrage_resource_id = Column(String(64))
|
||||
vitrage_resource_project_id = Column(String(64), index=True)
|
||||
payload = Column(JSONEncodedDict())
|
||||
|
||||
def __repr__(self):
|
||||
return \
|
||||
"<Alarm(" \
|
||||
"vitrage_id='%s', " \
|
||||
"start_timestamp='%s', " \
|
||||
"end_timestamp='%s'," \
|
||||
"name='%s'," \
|
||||
"vitrage_type='%s'," \
|
||||
"vitrage_aggregated_severity='%s'," \
|
||||
"project_id='%s'," \
|
||||
"vitrage_resource_type='%s'," \
|
||||
"vitrage_resource_id='%s'," \
|
||||
"vitrage_resource_project_id='%s'," \
|
||||
"payload='%s')>" % \
|
||||
(
|
||||
self.vitrage_id,
|
||||
self.start_timestamp,
|
||||
self.end_timestamp,
|
||||
self.name,
|
||||
self.vitrage_type,
|
||||
self.vitrage_aggregated_severity,
|
||||
self.project_id,
|
||||
self.vitrage_resource_type,
|
||||
self.vitrage_resource_id,
|
||||
self.vitrage_resource_project_id,
|
||||
self.payload
|
||||
)
|
||||
|
||||
|
||||
class Edge(Base):
|
||||
|
||||
__tablename__ = 'edges'
|
||||
|
||||
source_id = Column(String(128),
|
||||
ForeignKey('alarms.vitrage_id', ondelete='CASCADE'),
|
||||
primary_key=True)
|
||||
target_id = Column(String(128),
|
||||
ForeignKey('alarms.vitrage_id', ondelete='CASCADE'),
|
||||
primary_key=True)
|
||||
label = Column(String(64), nullable=False)
|
||||
start_timestamp = Column(DateTime, nullable=False)
|
||||
end_timestamp = Column(DateTime, nullable=False, default=DEFAULT_END_TIME)
|
||||
payload = Column(JSONEncodedDict())
|
||||
|
||||
source = relationship("Alarm", foreign_keys=[source_id])
|
||||
target = relationship("Alarm", foreign_keys=[target_id])
|
||||
|
||||
def __repr__(self):
|
||||
return \
|
||||
"<Edge(" \
|
||||
"source_id='%s', " \
|
||||
"target_id='%s', " \
|
||||
"label='%s', " \
|
||||
"start_timestamp='%s'," \
|
||||
"end_timestamp='%s',"\
|
||||
"payload='%s)>" % \
|
||||
(
|
||||
self.source_id,
|
||||
self.target_id,
|
||||
self.label,
|
||||
self.start_timestamp,
|
||||
self.end_timestamp,
|
||||
self.payload
|
||||
)
|
||||
|
||||
|
||||
class Change(Base):
|
||||
|
||||
__tablename__ = 'changes'
|
||||
|
||||
id = Column(MagicBigInt, primary_key=True, autoincrement=True)
|
||||
vitrage_id = Column(String(128),
|
||||
ForeignKey('alarms.vitrage_id', ondelete='CASCADE'),
|
||||
index=True, nullable=False)
|
||||
timestamp = Column(DateTime, index=True, nullable=False)
|
||||
severity = Column(String(64), index=True, nullable=False)
|
||||
payload = Column(JSONEncodedDict())
|
||||
|
||||
alarm_id = relationship("Alarm", foreign_keys=[vitrage_id])
|
||||
|
||||
def __repr__(self):
|
||||
return \
|
||||
"<Change(" \
|
||||
"id='%s', " \
|
||||
"vitrage_id='%s', " \
|
||||
"timestamp='%s', " \
|
||||
"severity='%s'," \
|
||||
"payload='%s')>" % \
|
||||
(
|
||||
self.id,
|
||||
self.vitrage_id,
|
||||
self.timestamp,
|
||||
self.severity,
|
||||
self.payload
|
||||
)
|
||||
|
@ -16,11 +16,14 @@ import json
|
||||
|
||||
from testtools import matchers
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from vitrage.api_handler.apis.alarm import AlarmApis
|
||||
from vitrage.api_handler.apis.rca import RcaApis
|
||||
from vitrage.api_handler.apis.resource import ResourceApis
|
||||
from vitrage.api_handler.apis.topology import TopologyApis
|
||||
from vitrage.common.constants import EdgeLabel
|
||||
from vitrage.common.constants import EdgeProperties
|
||||
from vitrage.common.constants import EntityCategory
|
||||
from vitrage.common.constants import VertexProperties as VProps
|
||||
from vitrage.datasources import NOVA_HOST_DATASOURCE
|
||||
@ -30,18 +33,29 @@ from vitrage.datasources.transformer_base \
|
||||
import create_cluster_placeholder_vertex
|
||||
from vitrage.entity_graph.mappings.operational_alarm_severity import \
|
||||
OperationalAlarmSeverity
|
||||
from vitrage.entity_graph.processor.notifier import PersistNotifier
|
||||
from vitrage.graph.driver.networkx_graph import edge_copy
|
||||
from vitrage.graph.driver.networkx_graph import NXGraph
|
||||
import vitrage.graph.utils as graph_utils
|
||||
from vitrage.persistency.service import VitragePersistorEndpoint
|
||||
from vitrage.tests.base import IsEmpty
|
||||
from vitrage.tests.functional.test_configuration import TestConfiguration
|
||||
from vitrage.tests.unit.entity_graph.base import TestEntityGraphUnitBase
|
||||
from vitrage.utils.datetime import utcnow
|
||||
|
||||
|
||||
class TestApis(TestEntityGraphUnitBase):
|
||||
class TestApis(TestEntityGraphUnitBase, TestConfiguration):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(TestApis, cls).setUpClass()
|
||||
cls.conf = cfg.ConfigOpts()
|
||||
cls.add_db(cls.conf)
|
||||
|
||||
def test_get_alarms_with_admin_project(self):
|
||||
# Setup
|
||||
graph = self._create_graph()
|
||||
apis = AlarmApis(graph, None)
|
||||
apis = AlarmApis(graph, self.conf, self._db)
|
||||
ctx = {'tenant': 'project_1', 'is_admin': True}
|
||||
|
||||
# Action
|
||||
@ -55,7 +69,7 @@ class TestApis(TestEntityGraphUnitBase):
|
||||
def test_get_alarms_with_not_admin_project(self):
|
||||
# Setup
|
||||
graph = self._create_graph()
|
||||
apis = AlarmApis(graph, None)
|
||||
apis = AlarmApis(graph, self.conf, self._db)
|
||||
ctx = {'tenant': 'project_2', 'is_admin': False}
|
||||
|
||||
# Action
|
||||
@ -69,7 +83,7 @@ class TestApis(TestEntityGraphUnitBase):
|
||||
def test_get_alarm_counts_with_not_admin_project(self):
|
||||
# Setup
|
||||
graph = self._create_graph()
|
||||
apis = AlarmApis(graph, None)
|
||||
apis = AlarmApis(graph, self.conf, self._db)
|
||||
ctx = {'tenant': 'project_2', 'is_admin': False}
|
||||
|
||||
# Action
|
||||
@ -86,7 +100,7 @@ class TestApis(TestEntityGraphUnitBase):
|
||||
def test_get_alarms_with_all_tenants(self):
|
||||
# Setup
|
||||
graph = self._create_graph()
|
||||
apis = AlarmApis(graph, None)
|
||||
apis = AlarmApis(graph, self.conf, self._db)
|
||||
ctx = {'tenant': 'project_1', 'is_admin': False}
|
||||
|
||||
# Action
|
||||
@ -100,7 +114,7 @@ class TestApis(TestEntityGraphUnitBase):
|
||||
def test_get_alarm_counts_with_all_tenants(self):
|
||||
# Setup
|
||||
graph = self._create_graph()
|
||||
apis = AlarmApis(graph, None)
|
||||
apis = AlarmApis(graph, self.conf, self._db)
|
||||
ctx = {'tenant': 'project_1', 'is_admin': False}
|
||||
|
||||
# Action
|
||||
@ -117,7 +131,7 @@ class TestApis(TestEntityGraphUnitBase):
|
||||
def test_get_rca_with_admin_project(self):
|
||||
# Setup
|
||||
graph = self._create_graph()
|
||||
apis = RcaApis(graph, None)
|
||||
apis = RcaApis(graph, self.conf, self._db)
|
||||
ctx = {'tenant': 'project_1', 'is_admin': True}
|
||||
|
||||
# Action
|
||||
@ -131,7 +145,7 @@ class TestApis(TestEntityGraphUnitBase):
|
||||
def test_get_rca_with_not_admin_project(self):
|
||||
# Setup
|
||||
graph = self._create_graph()
|
||||
apis = RcaApis(graph, None)
|
||||
apis = RcaApis(graph, self.conf, self._db)
|
||||
ctx = {'tenant': 'project_2', 'is_admin': False}
|
||||
|
||||
# Action
|
||||
@ -147,7 +161,7 @@ class TestApis(TestEntityGraphUnitBase):
|
||||
def test_get_rca_with_not_admin_bla_project(self):
|
||||
# Setup
|
||||
graph = self._create_graph()
|
||||
apis = RcaApis(graph, None)
|
||||
apis = RcaApis(graph, self.conf, self._db)
|
||||
ctx = {'tenant': 'project_2', 'is_admin': False}
|
||||
|
||||
# Action
|
||||
@ -161,7 +175,7 @@ class TestApis(TestEntityGraphUnitBase):
|
||||
def test_get_rca_with_all_tenants(self):
|
||||
# Setup
|
||||
graph = self._create_graph()
|
||||
apis = RcaApis(graph, None)
|
||||
apis = RcaApis(graph, self.conf, self._db)
|
||||
ctx = {'tenant': 'project_1', 'is_admin': False}
|
||||
|
||||
# Action
|
||||
@ -433,6 +447,7 @@ class TestApis(TestEntityGraphUnitBase):
|
||||
|
||||
def _create_graph(self):
|
||||
graph = NXGraph('Multi tenancy graph')
|
||||
self._add_alarm_persistency_subscription(graph)
|
||||
|
||||
# create vertices
|
||||
cluster_vertex = create_cluster_placeholder_vertex()
|
||||
@ -459,40 +474,54 @@ class TestApis(TestEntityGraphUnitBase):
|
||||
VProps.NAME: 'host_1',
|
||||
VProps.RESOURCE_ID: 'host_1',
|
||||
VProps.VITRAGE_OPERATIONAL_SEVERITY:
|
||||
OperationalAlarmSeverity.SEVERE,
|
||||
VProps.VITRAGE_AGGREGATED_SEVERITY:
|
||||
OperationalAlarmSeverity.SEVERE})
|
||||
alarm_on_instance_1_vertex = self._create_alarm(
|
||||
'alarm_on_instance_1',
|
||||
'deduced_alarm',
|
||||
project_id='project_1',
|
||||
vitrage_resource_project_id='project_1',
|
||||
metadata={VProps.VITRAGE_TYPE: NOVA_INSTANCE_DATASOURCE,
|
||||
VProps.NAME: 'instance_1',
|
||||
VProps.RESOURCE_ID: 'sdg7849ythksjdg',
|
||||
VProps.VITRAGE_OPERATIONAL_SEVERITY:
|
||||
OperationalAlarmSeverity.SEVERE,
|
||||
VProps.VITRAGE_AGGREGATED_SEVERITY:
|
||||
OperationalAlarmSeverity.SEVERE})
|
||||
alarm_on_instance_2_vertex = self._create_alarm(
|
||||
'alarm_on_instance_2',
|
||||
'deduced_alarm',
|
||||
vitrage_resource_project_id='project_1',
|
||||
metadata={VProps.VITRAGE_TYPE: NOVA_INSTANCE_DATASOURCE,
|
||||
VProps.NAME: 'instance_2',
|
||||
VProps.RESOURCE_ID: 'nbfhsdugf',
|
||||
VProps.VITRAGE_OPERATIONAL_SEVERITY:
|
||||
OperationalAlarmSeverity.WARNING,
|
||||
VProps.VITRAGE_AGGREGATED_SEVERITY:
|
||||
OperationalAlarmSeverity.WARNING})
|
||||
alarm_on_instance_3_vertex = self._create_alarm(
|
||||
'alarm_on_instance_3',
|
||||
'deduced_alarm',
|
||||
project_id='project_2',
|
||||
vitrage_resource_project_id='project_2',
|
||||
metadata={VProps.VITRAGE_TYPE: NOVA_INSTANCE_DATASOURCE,
|
||||
VProps.NAME: 'instance_3',
|
||||
VProps.RESOURCE_ID: 'nbffhsdasdugf',
|
||||
VProps.VITRAGE_OPERATIONAL_SEVERITY:
|
||||
OperationalAlarmSeverity.CRITICAL,
|
||||
VProps.VITRAGE_AGGREGATED_SEVERITY:
|
||||
OperationalAlarmSeverity.CRITICAL})
|
||||
alarm_on_instance_4_vertex = self._create_alarm(
|
||||
'alarm_on_instance_4',
|
||||
'deduced_alarm',
|
||||
vitrage_resource_project_id='project_2',
|
||||
metadata={VProps.VITRAGE_TYPE: NOVA_INSTANCE_DATASOURCE,
|
||||
VProps.NAME: 'instance_4',
|
||||
VProps.RESOURCE_ID: 'ngsuy76hgd87f',
|
||||
VProps.VITRAGE_OPERATIONAL_SEVERITY:
|
||||
OperationalAlarmSeverity.WARNING,
|
||||
VProps.VITRAGE_AGGREGATED_SEVERITY:
|
||||
OperationalAlarmSeverity.WARNING})
|
||||
|
||||
# create links
|
||||
@ -500,63 +529,78 @@ class TestApis(TestEntityGraphUnitBase):
|
||||
edges.append(graph_utils.create_edge(
|
||||
cluster_vertex.vertex_id,
|
||||
zone_vertex.vertex_id,
|
||||
EdgeLabel.CONTAINS))
|
||||
EdgeLabel.CONTAINS,
|
||||
update_timestamp=str(utcnow())))
|
||||
edges.append(graph_utils.create_edge(
|
||||
zone_vertex.vertex_id,
|
||||
host_vertex.vertex_id,
|
||||
EdgeLabel.CONTAINS))
|
||||
EdgeLabel.CONTAINS,
|
||||
update_timestamp=str(utcnow())))
|
||||
edges.append(graph_utils.create_edge(
|
||||
host_vertex.vertex_id,
|
||||
instance_1_vertex.vertex_id,
|
||||
EdgeLabel.CONTAINS))
|
||||
EdgeLabel.CONTAINS,
|
||||
update_timestamp=str(utcnow())))
|
||||
edges.append(graph_utils.create_edge(
|
||||
host_vertex.vertex_id,
|
||||
instance_2_vertex.vertex_id,
|
||||
EdgeLabel.CONTAINS))
|
||||
EdgeLabel.CONTAINS,
|
||||
update_timestamp=str(utcnow())))
|
||||
edges.append(graph_utils.create_edge(
|
||||
host_vertex.vertex_id,
|
||||
instance_3_vertex.vertex_id,
|
||||
EdgeLabel.CONTAINS))
|
||||
EdgeLabel.CONTAINS,
|
||||
update_timestamp=str(utcnow())))
|
||||
edges.append(graph_utils.create_edge(
|
||||
host_vertex.vertex_id,
|
||||
instance_4_vertex.vertex_id,
|
||||
EdgeLabel.CONTAINS))
|
||||
EdgeLabel.CONTAINS,
|
||||
update_timestamp=str(utcnow())))
|
||||
edges.append(graph_utils.create_edge(
|
||||
alarm_on_host_vertex.vertex_id,
|
||||
host_vertex.vertex_id,
|
||||
EdgeLabel.ON))
|
||||
EdgeLabel.ON,
|
||||
update_timestamp=str(utcnow())))
|
||||
edges.append(graph_utils.create_edge(
|
||||
alarm_on_instance_1_vertex.vertex_id,
|
||||
instance_1_vertex.vertex_id,
|
||||
EdgeLabel.ON))
|
||||
EdgeLabel.ON,
|
||||
update_timestamp=str(utcnow())))
|
||||
edges.append(graph_utils.create_edge(
|
||||
alarm_on_instance_2_vertex.vertex_id,
|
||||
instance_2_vertex.vertex_id,
|
||||
EdgeLabel.ON))
|
||||
EdgeLabel.ON,
|
||||
update_timestamp=str(utcnow())))
|
||||
edges.append(graph_utils.create_edge(
|
||||
alarm_on_instance_3_vertex.vertex_id,
|
||||
instance_3_vertex.vertex_id,
|
||||
EdgeLabel.ON))
|
||||
EdgeLabel.ON,
|
||||
update_timestamp=str(utcnow())))
|
||||
edges.append(graph_utils.create_edge(
|
||||
alarm_on_instance_4_vertex.vertex_id,
|
||||
instance_4_vertex.vertex_id,
|
||||
EdgeLabel.ON))
|
||||
EdgeLabel.ON,
|
||||
update_timestamp=str(utcnow())))
|
||||
edges.append(graph_utils.create_edge(
|
||||
alarm_on_host_vertex.vertex_id,
|
||||
alarm_on_instance_1_vertex.vertex_id,
|
||||
EdgeLabel.CAUSES))
|
||||
EdgeLabel.CAUSES,
|
||||
update_timestamp=str(utcnow())))
|
||||
edges.append(graph_utils.create_edge(
|
||||
alarm_on_host_vertex.vertex_id,
|
||||
alarm_on_instance_2_vertex.vertex_id,
|
||||
EdgeLabel.CAUSES))
|
||||
EdgeLabel.CAUSES,
|
||||
update_timestamp=str(utcnow())))
|
||||
edges.append(graph_utils.create_edge(
|
||||
alarm_on_host_vertex.vertex_id,
|
||||
alarm_on_instance_3_vertex.vertex_id,
|
||||
EdgeLabel.CAUSES))
|
||||
EdgeLabel.CAUSES,
|
||||
update_timestamp=str(utcnow())))
|
||||
edges.append(graph_utils.create_edge(
|
||||
alarm_on_host_vertex.vertex_id,
|
||||
alarm_on_instance_4_vertex.vertex_id,
|
||||
EdgeLabel.CAUSES))
|
||||
EdgeLabel.CAUSES,
|
||||
update_timestamp=str(utcnow())))
|
||||
|
||||
# add vertices to graph
|
||||
graph.add_vertex(cluster_vertex)
|
||||
@ -577,3 +621,26 @@ class TestApis(TestEntityGraphUnitBase):
|
||||
graph.add_edge(edge)
|
||||
|
||||
return graph
|
||||
|
||||
def _add_alarm_persistency_subscription(self, graph):
|
||||
|
||||
self._db.alarms.delete()
|
||||
self._db.changes.delete()
|
||||
self._db.edges.delete()
|
||||
persistor_endpoint = VitragePersistorEndpoint(self._db)
|
||||
|
||||
def callback(before, curr, is_vertex, graph):
|
||||
notification_types = PersistNotifier._get_notification_type(
|
||||
before, curr, is_vertex)
|
||||
if not is_vertex:
|
||||
curr = edge_copy(
|
||||
curr.source_id, curr.target_id, curr.label,
|
||||
curr.properties)
|
||||
curr.properties[EdgeProperties.SOURCE_ID] = curr.source_id
|
||||
curr.properties[EdgeProperties.TARGET_ID] = curr.target_id
|
||||
|
||||
for notification_type in notification_types:
|
||||
persistor_endpoint.process_event(notification_type,
|
||||
curr.properties)
|
||||
|
||||
graph.subscribe(callback)
|
||||
|
@ -31,6 +31,7 @@ from vitrage.opts import register_opts
|
||||
from vitrage.tests import base
|
||||
from vitrage.tests.mocks import mock_driver as mock_sync
|
||||
from vitrage.tests.mocks import utils
|
||||
from vitrage.utils.datetime import utcnow
|
||||
|
||||
|
||||
class TestEntityGraphUnitBase(base.BaseTest):
|
||||
@ -153,17 +154,23 @@ class TestEntityGraphUnitBase(base.BaseTest):
|
||||
return events_list[0]
|
||||
|
||||
@staticmethod
|
||||
def _create_alarm(vitrage_id, alarm_type, project_id=None, metadata=None):
|
||||
def _create_alarm(vitrage_id,
|
||||
alarm_type,
|
||||
project_id=None,
|
||||
vitrage_resource_project_id=None,
|
||||
metadata=None):
|
||||
return graph_utils.create_vertex(
|
||||
vitrage_id,
|
||||
vitrage_category=EntityCategory.ALARM,
|
||||
vitrage_type=alarm_type,
|
||||
vitrage_sample_timestamp=None,
|
||||
update_timestamp=str(utcnow()),
|
||||
vitrage_is_deleted=False,
|
||||
vitrage_is_placeholder=False,
|
||||
entity_id=vitrage_id,
|
||||
entity_state='active',
|
||||
project_id=project_id,
|
||||
vitrage_resource_project_id=vitrage_resource_project_id,
|
||||
metadata=metadata
|
||||
)
|
||||
|
||||
@ -174,6 +181,7 @@ class TestEntityGraphUnitBase(base.BaseTest):
|
||||
vitrage_category=EntityCategory.RESOURCE,
|
||||
vitrage_type=resource_type,
|
||||
vitrage_sample_timestamp=None,
|
||||
update_timestamp=str(utcnow()),
|
||||
vitrage_is_deleted=False,
|
||||
vitrage_is_placeholder=False,
|
||||
entity_id=vitrage_id,
|
||||
|
@ -25,7 +25,7 @@ from vitrage.common.constants import EntityCategory
|
||||
from vitrage.common.constants import NotifierEventTypes as NType
|
||||
from vitrage.common.constants import VertexProperties as VProps
|
||||
from vitrage.datasources.nova.host import NOVA_HOST_DATASOURCE
|
||||
from vitrage.entity_graph.processor.notifier import _get_notification_type
|
||||
from vitrage.entity_graph.processor.notifier import GraphNotifier as GN
|
||||
from vitrage.evaluator.actions import evaluator_event_transformer as evaluator
|
||||
from vitrage.graph import Vertex
|
||||
from vitrage.tests import base
|
||||
@ -90,59 +90,59 @@ class GraphTest(base.BaseTest):
|
||||
return lst[0] if len(lst) > 0 else None
|
||||
|
||||
def test_notification_type_new_alarm(self):
|
||||
ret = _get_notification_type(None, deduced_alarm, True)
|
||||
ret = GN._get_notification_type(None, deduced_alarm, True)
|
||||
self.assertEqual(NType.ACTIVATE_DEDUCED_ALARM_EVENT,
|
||||
self.get_first(ret),
|
||||
'new alarm should notify activate')
|
||||
|
||||
ret = _get_notification_type(None, non_deduced_alarm, True)
|
||||
ret = GN._get_notification_type(None, non_deduced_alarm, True)
|
||||
self.assertIsNone(self.get_first(ret),
|
||||
'alarm that is not a deduced alarm')
|
||||
|
||||
def test_notification_type_deleted_alarm(self):
|
||||
ret = _get_notification_type(deduced_alarm, deleted_alarm, True)
|
||||
ret = GN._get_notification_type(deduced_alarm, deleted_alarm, True)
|
||||
self.assertEqual(NType.DEACTIVATE_DEDUCED_ALARM_EVENT,
|
||||
self.get_first(ret),
|
||||
'deleted alarm should notify deactivate')
|
||||
|
||||
def test_notification_type_resource_vertex(self):
|
||||
ret = _get_notification_type(None, resource, True)
|
||||
ret = GN._get_notification_type(None, resource, True)
|
||||
self.assertIsNone(self.get_first(ret),
|
||||
'any non alarm vertex should be ignored')
|
||||
|
||||
def test_notification_type_updated_alarm(self):
|
||||
ret = _get_notification_type(deduced_alarm, deduced_alarm, True)
|
||||
ret = GN._get_notification_type(deduced_alarm, deduced_alarm, True)
|
||||
self.assertIsNone(self.get_first(ret),
|
||||
'A not new alarm vertex should be ignored')
|
||||
|
||||
ret = _get_notification_type(deleted_alarm, deduced_alarm, True)
|
||||
ret = GN._get_notification_type(deleted_alarm, deduced_alarm, True)
|
||||
self.assertEqual(NType.ACTIVATE_DEDUCED_ALARM_EVENT,
|
||||
self.get_first(ret),
|
||||
'old alarm become not deleted should notify activate')
|
||||
|
||||
ret = _get_notification_type(placeholder_alarm, deduced_alarm, True)
|
||||
ret = GN._get_notification_type(placeholder_alarm, deduced_alarm, True)
|
||||
self.assertEqual(NType.ACTIVATE_DEDUCED_ALARM_EVENT,
|
||||
self.get_first(ret),
|
||||
'placeholder become active should notify activate')
|
||||
|
||||
def test_notification_type_placeholder_alarm(self):
|
||||
ret = _get_notification_type(None, placeholder_alarm, True)
|
||||
ret = GN._get_notification_type(None, placeholder_alarm, True)
|
||||
self.assertIsNone(self.get_first(ret),
|
||||
'A not new alarm vertex should be ignored')
|
||||
|
||||
def test_notification_type_new_host(self):
|
||||
ret = _get_notification_type(None, forced_down_host, True)
|
||||
ret = GN._get_notification_type(None, forced_down_host, True)
|
||||
self.assertEqual(NType.ACTIVATE_MARK_DOWN_EVENT,
|
||||
self.get_first(ret),
|
||||
'new host with forced_down should notify activate')
|
||||
|
||||
ret = _get_notification_type(None, host, True)
|
||||
ret = GN._get_notification_type(None, host, True)
|
||||
self.assertIsNone(self.get_first(ret), 'host without forced_down')
|
||||
|
||||
def test_notification_type_deleted_host(self):
|
||||
deleted_host = copy.deepcopy(forced_down_host)
|
||||
deleted_host[VProps.VITRAGE_IS_DELETED] = True
|
||||
ret = _get_notification_type(forced_down_host, deleted_host, True)
|
||||
ret = GN._get_notification_type(forced_down_host, deleted_host, True)
|
||||
self.assertEqual(
|
||||
NType.DEACTIVATE_MARK_DOWN_EVENT,
|
||||
self.get_first(ret),
|
||||
@ -150,7 +150,7 @@ class GraphTest(base.BaseTest):
|
||||
|
||||
deleted_host = copy.deepcopy(host)
|
||||
deleted_host[VProps.VITRAGE_IS_DELETED] = True
|
||||
ret = _get_notification_type(forced_down_host, deleted_host, True)
|
||||
ret = GN._get_notification_type(forced_down_host, deleted_host, True)
|
||||
self.assertEqual(
|
||||
NType.DEACTIVATE_MARK_DOWN_EVENT,
|
||||
self.get_first(ret),
|
||||
@ -158,32 +158,34 @@ class GraphTest(base.BaseTest):
|
||||
|
||||
deleted_host = copy.deepcopy(host)
|
||||
deleted_host[VProps.VITRAGE_IS_DELETED] = True
|
||||
ret = _get_notification_type(host, deleted_host, True)
|
||||
ret = GN._get_notification_type(host, deleted_host, True)
|
||||
self.assertIsNone(
|
||||
self.get_first(ret),
|
||||
'deleted host without forced_down should not notify')
|
||||
|
||||
def test_notification_type_updated_host(self):
|
||||
ret = _get_notification_type(forced_down_host, forced_down_host, True)
|
||||
ret = GN._get_notification_type(
|
||||
forced_down_host, forced_down_host, True)
|
||||
self.assertIsNone(self.get_first(ret),
|
||||
'A not new host should be ignored')
|
||||
|
||||
deleted_host = copy.deepcopy(forced_down_host)
|
||||
deleted_host[VProps.VITRAGE_IS_DELETED] = True
|
||||
ret = _get_notification_type(deleted_host, forced_down_host, True)
|
||||
ret = GN._get_notification_type(deleted_host, forced_down_host, True)
|
||||
self.assertEqual(NType.ACTIVATE_MARK_DOWN_EVENT,
|
||||
self.get_first(ret),
|
||||
'old host become not deleted should notify activate')
|
||||
|
||||
deleted_host = copy.deepcopy(forced_down_host)
|
||||
deleted_host[VProps.VITRAGE_IS_DELETED] = True
|
||||
ret = _get_notification_type(deleted_host, host, True)
|
||||
ret = GN._get_notification_type(deleted_host, host, True)
|
||||
self.assertIsNone(self.get_first(ret),
|
||||
'old host become not deleted should not notify')
|
||||
|
||||
placeholder_host = copy.deepcopy(forced_down_host)
|
||||
placeholder_host[VProps.VITRAGE_IS_PLACEHOLDER] = True
|
||||
ret = _get_notification_type(placeholder_host, forced_down_host, True)
|
||||
ret = GN._get_notification_type(
|
||||
placeholder_host, forced_down_host, True)
|
||||
self.assertEqual(NType.ACTIVATE_MARK_DOWN_EVENT,
|
||||
self.get_first(ret),
|
||||
'placeholder become active should notify activate')
|
||||
@ -191,6 +193,6 @@ class GraphTest(base.BaseTest):
|
||||
def test_notification_type_placeholder_host(self):
|
||||
placeholder_host = copy.deepcopy(forced_down_host)
|
||||
placeholder_host[VProps.VITRAGE_IS_PLACEHOLDER] = True
|
||||
ret = _get_notification_type(None, placeholder_host, True)
|
||||
ret = GN._get_notification_type(None, placeholder_host, True)
|
||||
self.assertIsNone(self.get_first(ret),
|
||||
'A not new host vertex should be ignored')
|
||||
|
Loading…
Reference in New Issue
Block a user