289 lines
11 KiB
Python
289 lines
11 KiB
Python
# Copyright 2015 - Alcatel-Lucent
|
|
# Copyright 2016 - Nokia
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from oslo_log import log
|
|
|
|
from vitrage.common.constants import EventAction
|
|
from vitrage.common.constants import VertexProperties as VProps
|
|
from vitrage.entity_graph.processor import base as processor
|
|
from vitrage.entity_graph.processor import entity_graph
|
|
from vitrage.entity_graph.states.state_manager import StateManager
|
|
from vitrage.entity_graph.transformer_manager import TransformerManager
|
|
from vitrage.graph import Direction
|
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
|
|
class Processor(processor.ProcessorBase):
|
|
|
|
def __init__(self, cfg, initialization_status, e_graph=None):
|
|
self.cfg = cfg
|
|
self.transformer_manager = TransformerManager(self.cfg)
|
|
self.state_manager = StateManager(self.cfg)
|
|
self._initialize_events_actions()
|
|
self.initialization_status = initialization_status
|
|
self.entity_graph = entity_graph.EntityGraph("Entity Graph") if \
|
|
e_graph is None else e_graph
|
|
|
|
def process_event(self, event):
|
|
"""Decides which action to run on given event
|
|
|
|
Transforms the event into a tupple (vertex, neighbors,action).
|
|
After transforming, it runs the correct action according to the
|
|
action received from the transformer.
|
|
|
|
:param event: The event to be processed
|
|
:type event: Dictionary
|
|
"""
|
|
|
|
entity = self.transform_entity(event)
|
|
self._calculate_aggregated_state(entity.vertex, entity.action)
|
|
return self.actions[entity.action](entity.vertex, entity.neighbors)
|
|
|
|
def create_entity(self, new_vertex, neighbors):
|
|
"""Adds new vertex to the entity graph
|
|
|
|
Adds the entity to the entity graph, and connects it's neighbors
|
|
|
|
:param new_vertex: The new vertex to add to graph
|
|
:type new_vertex: Vertex
|
|
|
|
:param neighbors: The neighbors of the new vertex
|
|
:type neighbors: List
|
|
"""
|
|
|
|
LOG.debug("Add entity to entity graph: %s", new_vertex)
|
|
self.entity_graph.add_vertex(new_vertex)
|
|
self._connect_neighbors(neighbors, [], EventAction.CREATE_ENTITY)
|
|
|
|
def update_entity(self, updated_vertex, neighbors):
|
|
"""Updates the vertex in the entity graph
|
|
|
|
Updates the in entity in the entity graph. In addition it removes old
|
|
neighbor connections, and connects the new neighbors.
|
|
|
|
:param updated_vertex: The vertex to be updated in the graph
|
|
:type updated_vertex: Vertex
|
|
|
|
:param neighbors: The neighbors of the updated vertex
|
|
:type neighbors: List
|
|
"""
|
|
|
|
LOG.debug("Update entity in entity graph: %s", updated_vertex)
|
|
|
|
graph_vertex = \
|
|
self.entity_graph.get_vertex(updated_vertex.vertex_id)
|
|
|
|
if (not graph_vertex) or self.entity_graph.check_update_validation(
|
|
graph_vertex, updated_vertex):
|
|
self.entity_graph.update_vertex(updated_vertex)
|
|
self._update_neighbors(updated_vertex, neighbors)
|
|
else:
|
|
LOG.info("Update event arrived on invalid resource: %s",
|
|
updated_vertex)
|
|
|
|
def delete_entity(self, deleted_vertex, neighbors):
|
|
"""Deletes the vertex from the entity graph
|
|
|
|
Marks the corresponding vertex and its edges as deleted
|
|
|
|
:param deleted_vertex: The vertex to be deleted from the graph
|
|
:type deleted_vertex: Vertex
|
|
|
|
:param neighbors: The neighbors of the deleted vertex
|
|
:type neighbors: List
|
|
"""
|
|
|
|
LOG.debug("Delete entity from entity graph: %s", deleted_vertex)
|
|
|
|
graph_vertex = \
|
|
self.entity_graph.get_vertex(deleted_vertex.vertex_id)
|
|
|
|
if (not graph_vertex) or self.entity_graph.check_update_validation(
|
|
graph_vertex, deleted_vertex):
|
|
neighbor_vertices = self.entity_graph.neighbors(
|
|
deleted_vertex.vertex_id)
|
|
neighbor_edges = self.entity_graph.get_edges(
|
|
deleted_vertex.vertex_id)
|
|
|
|
for edge in neighbor_edges:
|
|
self.entity_graph.mark_edge_as_deleted(edge)
|
|
|
|
for vertex in neighbor_vertices:
|
|
self.entity_graph.delete_placeholder_vertex(vertex)
|
|
|
|
self.entity_graph.mark_vertex_as_deleted(deleted_vertex)
|
|
else:
|
|
LOG.info("Delete event arrived on invalid resource: %s",
|
|
deleted_vertex)
|
|
|
|
def update_relationship(self, entity_vertex, neighbors):
|
|
LOG.debug("Update relationship in entity graph: %s", neighbors)
|
|
|
|
for neighbor in neighbors:
|
|
# TODO(Alexey): maybe to check if the vertices exists
|
|
self.entity_graph.update_edge(neighbor.edge)
|
|
|
|
def delete_relationship(self, updated_vertex, neighbors):
|
|
LOG.debug("Delete relationship from entity graph: %s", neighbors)
|
|
|
|
for neighbor in neighbors:
|
|
# TODO(Alexey): maybe to check if the vertices exists
|
|
self.entity_graph.remove_edge(neighbor.edge)
|
|
|
|
def handle_end_message(self, vertex, neighbors):
|
|
self.initialization_status.end_messages[vertex[VProps.TYPE]] = True
|
|
if len(self.initialization_status.end_messages) == \
|
|
len(self.cfg.synchronizer_plugins.plugin_type):
|
|
self.initialization_status.status = \
|
|
self.initialization_status.RECEIVED_ALL_END_MESSAGES
|
|
|
|
def transform_entity(self, event):
|
|
entity = self.transformer_manager.transform(event)
|
|
LOG.debug('Transformed entity: %s', entity)
|
|
return entity
|
|
|
|
def _update_neighbors(self, vertex, neighbors):
|
|
"""Updates vertices neighbor connections
|
|
|
|
1. Removes old neighbor connections
|
|
2. connects the new neighbors.
|
|
"""
|
|
|
|
(valid_edges, obsolete_edges) = self._find_edges_status(
|
|
vertex, neighbors)
|
|
self._delete_old_connections(vertex, obsolete_edges)
|
|
self._connect_neighbors(neighbors,
|
|
valid_edges,
|
|
EventAction.UPDATE_ENTITY)
|
|
|
|
def _connect_neighbors(self, neighbors, valid_edges, action):
|
|
"""Updates the neighbor vertex and adds the connection edges """
|
|
LOG.debug("Connect neighbors. Neighbors: %s, valid_edges: %s",
|
|
neighbors, valid_edges)
|
|
for (vertex, edge) in neighbors:
|
|
graph_vertex = self.entity_graph.get_vertex(vertex.vertex_id)
|
|
if not graph_vertex or \
|
|
not self.entity_graph.is_vertex_deleted(graph_vertex):
|
|
if self.entity_graph.can_update_vertex(graph_vertex, vertex):
|
|
LOG.debug("Updates vertex: %s", vertex)
|
|
self._calculate_aggregated_state(vertex, action)
|
|
self.entity_graph.update_vertex(vertex)
|
|
|
|
if edge not in valid_edges:
|
|
LOG.debug("Updates edge: %s", edge)
|
|
self.entity_graph.update_edge(edge)
|
|
else:
|
|
LOG.debug("neighbor vertex wasn't updated: %s", vertex)
|
|
|
|
def _delete_old_connections(self, vertex, obsolete_edges):
|
|
"""Deletes the "vertex" old connections
|
|
|
|
Finds the old connections that are connected to updated_vertex,
|
|
and marks them as deleted
|
|
"""
|
|
|
|
LOG.debug("Delete old connections. Vertex: %s, old edges: %s",
|
|
vertex, obsolete_edges)
|
|
# remove old edges and placeholder vertices if exist
|
|
for edge in obsolete_edges:
|
|
self.entity_graph.mark_edge_as_deleted(edge)
|
|
graph_ver = self.entity_graph.get_vertex(
|
|
edge.other_vertex(vertex.vertex_id))
|
|
self.entity_graph.delete_placeholder_vertex(graph_ver)
|
|
|
|
def _find_edges_status(self, vertex, neighbors):
|
|
"""Finds "vertex" valid and old connections
|
|
|
|
Checks all the edges that are connected to the vertex in the entity
|
|
graph, and finds which of them are old connections (edges that are no
|
|
longer connected to those entities), and which are valid connections.
|
|
"""
|
|
|
|
valid_edges = []
|
|
obsolete_edges = []
|
|
|
|
graph_neighbor_types = \
|
|
self.entity_graph.find_neighbor_types(neighbors)
|
|
|
|
for curr_edge in self.entity_graph.get_edges(
|
|
vertex.vertex_id,
|
|
direction=Direction.BOTH):
|
|
# check if the edge in the graph has a a connection to the
|
|
# same type of resources in the new neighbors list
|
|
neighbor_vertex = self.entity_graph.get_vertex(
|
|
curr_edge.other_vertex(vertex.vertex_id))
|
|
|
|
is_connection_type_exist = self.entity_graph.get_vertex_category(
|
|
neighbor_vertex) in graph_neighbor_types
|
|
|
|
if not is_connection_type_exist:
|
|
valid_edges.append(curr_edge)
|
|
continue
|
|
|
|
neighbor_edges = [e for v, e in neighbors]
|
|
if curr_edge in neighbor_edges:
|
|
valid_edges.append(curr_edge)
|
|
else:
|
|
obsolete_edges.append(curr_edge)
|
|
|
|
return valid_edges, obsolete_edges
|
|
|
|
def _initialize_events_actions(self):
|
|
self.actions = {
|
|
EventAction.CREATE_ENTITY: self.create_entity,
|
|
EventAction.UPDATE_ENTITY: self.update_entity,
|
|
EventAction.DELETE_ENTITY: self.delete_entity,
|
|
EventAction.UPDATE_RELATIONSHIP: self.update_relationship,
|
|
EventAction.DELETE_RELATIONSHIP: self.delete_relationship,
|
|
EventAction.END_MESSAGE: self.handle_end_message
|
|
}
|
|
|
|
def _calculate_aggregated_state(self, vertex, action):
|
|
LOG.debug("calculate event state")
|
|
|
|
if action in [EventAction.UPDATE_ENTITY,
|
|
EventAction.DELETE_ENTITY,
|
|
EventAction.CREATE_ENTITY]:
|
|
graph_vertex = self.entity_graph.get_vertex(vertex.vertex_id)
|
|
elif action in [EventAction.END_MESSAGE,
|
|
EventAction.UPDATE_RELATIONSHIP,
|
|
EventAction.DELETE_RELATIONSHIP]:
|
|
return None
|
|
else:
|
|
LOG.info('not recognized action: %s for vertex: %s',
|
|
action, vertex)
|
|
return None
|
|
|
|
state = self._get_updated_property(vertex, graph_vertex, VProps.STATE)
|
|
vitrage_state = self._get_updated_property(vertex,
|
|
graph_vertex,
|
|
VProps.VITRAGE_STATE)
|
|
|
|
vertex_type = vertex[VProps.TYPE] if VProps.TYPE in vertex.properties \
|
|
else graph_vertex[VProps.TYPE]
|
|
|
|
vertex[VProps.AGGREGATED_STATE] = self.state_manager.aggregated_state(
|
|
state, vitrage_state, vertex_type)
|
|
|
|
@staticmethod
|
|
def _get_updated_property(new_vertex, graph_vertex, prop):
|
|
if new_vertex and prop in new_vertex.properties:
|
|
return new_vertex[prop]
|
|
elif graph_vertex and prop in graph_vertex.properties:
|
|
return graph_vertex[prop]
|
|
|
|
return None
|