OpenStack RCA (Root Cause Analysis) Engine
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

385 lines
15 KiB

  1. # Copyright 2015 - Alcatel-Lucent
  2. # Copyright 2016 - Nokia
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. from oslo_log import log
  16. from vitrage.common.constants import EntityCategory as ECategory
  17. from vitrage.common.constants import GraphAction
  18. from vitrage.common.constants import VertexProperties as VProps
  19. from vitrage.datasources.transformer_base import TransformerBase
  20. from vitrage.entity_graph.mappings.datasource_info_mapper import \
  21. DatasourceInfoMapper
  22. from vitrage.entity_graph.processor import base as processor
  23. from vitrage.entity_graph.processor import processor_utils as PUtils
  24. from vitrage.entity_graph.processor.transformer_manager import \
  25. TransformerManager
  26. from vitrage.graph import Direction
  27. LOG = log.getLogger(__name__)
  28. class Processor(processor.ProcessorBase):
  29. def __init__(self, e_graph=None):
  30. super(Processor, self).__init__()
  31. self.transformer_manager = TransformerManager()
  32. self.info_mapper = DatasourceInfoMapper()
  33. self._initialize_events_actions()
  34. self.entity_graph = e_graph
  35. def process_event(self, event):
  36. """Decides which action to run on given event
  37. Transforms the event into a tuple (vertex, neighbors,action).
  38. After transforming, it runs the correct action according to the
  39. action received from the transformer.
  40. :param event: The event to be processed
  41. :type event: Dictionary
  42. """
  43. LOG.debug('processor event:\n%s', event)
  44. self._enrich_event(event)
  45. entity = self.transformer_manager.transform(event)
  46. if entity.action not in self.actions:
  47. LOG.warning('Deprecated or unknown entity %s ignored', entity)
  48. return
  49. self._calculate_vitrage_aggregated_values(entity.vertex, entity.action)
  50. self._set_datasource_name(entity, event)
  51. self.actions[entity.action](entity.vertex, entity.neighbors)
  52. def create_entity(self, new_vertex, neighbors):
  53. """Adds new vertex to the entity graph
  54. Adds the entity to the entity graph, and connects it's neighbors
  55. :param new_vertex: The new vertex to add to graph
  56. :type new_vertex: Vertex
  57. :param neighbors: The neighbors of the new vertex
  58. :type neighbors: List
  59. """
  60. LOG.debug('Add entity to entity graph:\n%s', new_vertex)
  61. self._add_resource_details_to_alarm(new_vertex, neighbors)
  62. self.entity_graph.add_vertex(new_vertex)
  63. self._connect_neighbors(neighbors, set(), GraphAction.CREATE_ENTITY)
  64. def update_entity(self, updated_vertex, neighbors):
  65. """Updates the vertex in the entity graph
  66. Updates the in entity in the entity graph. In addition it removes old
  67. neighbor connections, and connects the new neighbors.
  68. :param updated_vertex: The vertex to be updated in the graph
  69. :type updated_vertex: Vertex
  70. :param neighbors: The neighbors of the updated vertex
  71. :type neighbors: List
  72. """
  73. LOG.debug('Update entity in entity graph:\n%s', updated_vertex)
  74. graph_vertex = self.entity_graph.get_vertex(updated_vertex.vertex_id)
  75. if graph_vertex and not PUtils.is_newer_vertex(graph_vertex,
  76. updated_vertex):
  77. LOG.warning("Update event arrived later than expected - "
  78. "graph_vertex: %s --- updated_vertex: %s",
  79. graph_vertex, updated_vertex)
  80. return
  81. self._add_resource_details_to_alarm(updated_vertex, neighbors)
  82. PUtils.update_entity_graph_vertex(self.entity_graph,
  83. graph_vertex,
  84. updated_vertex)
  85. self._update_neighbors(updated_vertex, neighbors)
  86. def delete_entity(self, deleted_vertex, neighbors):
  87. """Deletes the vertex from the entity graph
  88. Marks the corresponding vertex and its edges as deleted
  89. :param deleted_vertex: The vertex to be deleted from the graph
  90. :type deleted_vertex: Vertex
  91. :param neighbors: The neighbors of the deleted vertex
  92. :type neighbors: List
  93. """
  94. LOG.debug('Delete entity from entity graph:\n%s', deleted_vertex)
  95. graph_vertex = self.entity_graph.get_vertex(deleted_vertex.vertex_id)
  96. if not graph_vertex:
  97. LOG.warning('Delete - vertex not found %s', deleted_vertex)
  98. return
  99. elif PUtils.is_deleted(graph_vertex):
  100. LOG.warning('Delete - vertex already deleted - '
  101. "graph_vertex: %s --- deleted_vertex: %s",
  102. graph_vertex, deleted_vertex)
  103. return
  104. elif not PUtils.is_newer_vertex(graph_vertex, deleted_vertex):
  105. LOG.warning("Delete event arrived later than expected - "
  106. "graph_vertex: %s --- deleted_vertex: %s",
  107. graph_vertex, deleted_vertex)
  108. return
  109. neighbor_vertices = self.entity_graph.neighbors(
  110. deleted_vertex.vertex_id)
  111. neighbor_edges = self.entity_graph.get_edges(
  112. deleted_vertex.vertex_id)
  113. for edge in neighbor_edges:
  114. PUtils.mark_deleted(self.entity_graph, edge)
  115. for vertex in neighbor_vertices:
  116. PUtils.delete_placeholder_vertex(self.entity_graph, vertex)
  117. PUtils.mark_deleted(self.entity_graph, deleted_vertex)
  118. def update_relationship(self, entity_vertex, neighbors):
  119. LOG.debug('Update relationship in entity graph:\n%s', neighbors)
  120. if entity_vertex:
  121. self.entity_graph.update_vertex(entity_vertex)
  122. for neighbor in neighbors:
  123. self.entity_graph.update_edge(neighbor.edge)
  124. def delete_relationship(self, updated_vertex, neighbors):
  125. LOG.debug('Delete relationship from entity graph:\n%s', neighbors)
  126. if updated_vertex:
  127. self.entity_graph.update_vertex(updated_vertex)
  128. for neighbor in neighbors:
  129. graph_edge = self.entity_graph.get_edge(neighbor.edge.source_id,
  130. neighbor.edge.target_id,
  131. neighbor.edge.label)
  132. if graph_edge:
  133. PUtils.mark_deleted(self.entity_graph, graph_edge)
  134. def remove_deleted_entity(self, vertex, neighbors):
  135. """Removes the deleted vertex from the entity graph
  136. Removes vertex that it's vitrage_is_deleted value is True
  137. :param vertex: The vertex to be removed from the graph
  138. :type vertex: Vertex
  139. :param neighbors: The neighbors of the deleted vertex
  140. :type neighbors: List - It's just a mock in this method
  141. """
  142. LOG.debug('Remove deleted entity from entity graph:\n%s', vertex)
  143. graph_vertex = self.entity_graph.get_vertex(vertex.vertex_id)
  144. if graph_vertex and PUtils.is_deleted(graph_vertex) and \
  145. PUtils.is_newer_vertex(graph_vertex, vertex):
  146. self.entity_graph.remove_vertex(vertex)
  147. else:
  148. LOG.warning("Remove deleted entity arrived on invalid resource: "
  149. "deleted_vertex - %s, graph_vertex - %s",
  150. vertex, graph_vertex)
  151. def _update_neighbors(self, vertex, neighbors):
  152. """Updates vertices neighbor connections
  153. 1. Removes old neighbor connections
  154. 2. connects the new neighbors.
  155. """
  156. (valid_edges, obsolete_edges) = self._find_edges_status(vertex,
  157. neighbors)
  158. self._delete_old_connections(vertex, obsolete_edges)
  159. self._connect_neighbors(neighbors,
  160. valid_edges,
  161. GraphAction.UPDATE_ENTITY)
  162. def _connect_neighbors(self, neighbors, valid_edges, action):
  163. """Updates the neighbor vertex and adds the connection edges """
  164. if not neighbors:
  165. LOG.debug('connect_neighbors - nothing to do')
  166. return
  167. LOG.debug("Connect neighbors. Neighbors: %s, valid_edges: %s",
  168. neighbors, valid_edges)
  169. for (vertex, edge) in neighbors:
  170. graph_vertex = self.entity_graph.get_vertex(vertex.vertex_id)
  171. if not graph_vertex or not PUtils.is_deleted(graph_vertex):
  172. if graph_vertex and not PUtils.is_newer_vertex(graph_vertex,
  173. vertex):
  174. LOG.warning("Neighbor update event arrived later than "
  175. "expected - graph_vertex: %s --- "
  176. "updated_vertex: %s", graph_vertex, vertex)
  177. else:
  178. LOG.debug("Updates vertex: %s", vertex)
  179. self._calculate_vitrage_aggregated_values(vertex, action)
  180. PUtils.update_entity_graph_vertex(self.entity_graph,
  181. graph_vertex,
  182. vertex)
  183. if edge not in valid_edges:
  184. LOG.debug("Updates edge: %s", edge)
  185. self.entity_graph.update_edge(edge)
  186. else:
  187. LOG.debug("neighbor vertex wasn't updated: %s", vertex)
  188. def _delete_old_connections(self, vertex, obsolete_edges):
  189. """Deletes the "vertex" old connections
  190. Finds the old connections that are connected to updated_vertex,
  191. and marks them as deleted
  192. """
  193. if not obsolete_edges:
  194. LOG.debug('obsolete_edges - nothing to do')
  195. return
  196. LOG.debug('Delete old connections. Vertex:\n%s', vertex)
  197. # remove old edges and placeholder vertices if exist
  198. for edge in obsolete_edges:
  199. LOG.debug("Delete obsolete edge:\n%s", edge)
  200. PUtils.mark_deleted(self.entity_graph, edge)
  201. graph_ver = self.entity_graph.get_vertex(
  202. edge.other_vertex(vertex.vertex_id))
  203. PUtils.delete_placeholder_vertex(self.entity_graph, graph_ver)
  204. def _find_edges_status(self, vertex, neighbors):
  205. """Finds "vertex" valid and old connections
  206. Checks all the edges that are connected to the vertex in the entity
  207. graph, and finds which of them are old connections (edges that are no
  208. longer connected to those entities), and which are valid connections.
  209. """
  210. valid_edges = set()
  211. obsolete_edges = set()
  212. graph_neighbor_types = \
  213. PUtils.find_neighbor_types(neighbors)
  214. neighbor_edges = set(e for v, e in neighbors)
  215. for curr_edge in self.entity_graph.get_edges(vertex.vertex_id,
  216. direction=Direction.BOTH):
  217. # check if the edge in the graph has a connection to the
  218. # same type of resources in the new neighbors list
  219. neighbor_vertex = self.entity_graph.get_vertex(
  220. curr_edge.other_vertex(vertex.vertex_id))
  221. is_connection_type_exist = PUtils.get_vertex_types(
  222. neighbor_vertex) in graph_neighbor_types
  223. if not is_connection_type_exist:
  224. valid_edges.add(curr_edge)
  225. continue
  226. if curr_edge in neighbor_edges:
  227. valid_edges.add(curr_edge)
  228. else:
  229. obsolete_edges.add(curr_edge)
  230. return valid_edges, obsolete_edges
  231. def _initialize_events_actions(self):
  232. self.actions = {
  233. GraphAction.CREATE_ENTITY: self.create_entity,
  234. GraphAction.UPDATE_ENTITY: self.update_entity,
  235. GraphAction.DELETE_ENTITY: self.delete_entity,
  236. GraphAction.UPDATE_RELATIONSHIP: self.update_relationship,
  237. GraphAction.DELETE_RELATIONSHIP: self.delete_relationship,
  238. GraphAction.REMOVE_DELETED_ENTITY: self.remove_deleted_entity,
  239. }
  240. def _calculate_vitrage_aggregated_values(self, vertex, action):
  241. LOG.debug("calculate event state")
  242. try:
  243. if action in [GraphAction.UPDATE_ENTITY,
  244. GraphAction.DELETE_ENTITY,
  245. GraphAction.CREATE_ENTITY]:
  246. graph_vertex = self.entity_graph.get_vertex(
  247. vertex.vertex_id)
  248. elif action in [GraphAction.REMOVE_DELETED_ENTITY,
  249. GraphAction.UPDATE_RELATIONSHIP,
  250. GraphAction.DELETE_RELATIONSHIP]:
  251. return None
  252. else:
  253. LOG.error('unrecognized action: %s for vertex: %s',
  254. action, vertex)
  255. return None
  256. self.info_mapper.vitrage_aggregate_values(vertex, graph_vertex)
  257. except Exception:
  258. LOG.exception("Calculate aggregated state failed.")
  259. def _enrich_event(self, event):
  260. attr = self.transformer_manager.get_enrich_query(event)
  261. if attr is None:
  262. return
  263. result = self.entity_graph.get_vertices(attr)
  264. event[TransformerBase.QUERY_RESULT] = result
  265. def _add_resource_details_to_alarm(self, vertex, neighbors):
  266. if not neighbors:
  267. return
  268. resource = None
  269. alarms = []
  270. if vertex.get(VProps.VITRAGE_CATEGORY) == ECategory.ALARM:
  271. alarms = [vertex]
  272. for neighbor in neighbors:
  273. if neighbor.vertex.get(VProps.VITRAGE_CATEGORY) ==\
  274. ECategory.RESOURCE:
  275. resource = neighbor.vertex
  276. elif vertex.get(VProps.VITRAGE_CATEGORY) == ECategory.RESOURCE:
  277. resource = vertex
  278. for neighbor in neighbors:
  279. if neighbor.vertex.get(VProps.VITRAGE_CATEGORY) == \
  280. ECategory.ALARM:
  281. alarms.append(neighbor.vertex)
  282. for alarm in alarms:
  283. if not resource:
  284. continue
  285. project_id = resource.get(VProps.PROJECT_ID)
  286. if not project_id:
  287. n_vertex = self.entity_graph.get_vertex(resource.vertex_id)
  288. project_id = n_vertex.get(VProps.PROJECT_ID) \
  289. if n_vertex else None
  290. self.add_resource_details(
  291. alarm,
  292. r_id=resource.vertex_id,
  293. r_type=resource.get(VProps.VITRAGE_TYPE),
  294. r_project_id=project_id)
  295. @staticmethod
  296. def add_resource_details(alarm, r_id, r_type, r_project_id):
  297. alarm[VProps.VITRAGE_RESOURCE_ID] = r_id
  298. alarm[VProps.VITRAGE_RESOURCE_TYPE] = r_type
  299. if r_project_id:
  300. alarm[VProps.VITRAGE_RESOURCE_PROJECT_ID] = r_project_id
  301. @staticmethod
  302. def _set_datasource_name(entity, event):
  303. if entity.vertex and \
  304. (entity.action == GraphAction.CREATE_ENTITY or
  305. entity.action == GraphAction.UPDATE_ENTITY):
  306. datasource_name = event.get(VProps.VITRAGE_DATASOURCE_NAME)
  307. if datasource_name:
  308. entity.vertex.properties[VProps.VITRAGE_DATASOURCE_NAME] = \
  309. datasource_name