diff --git a/vitrage/graph/driver/networkx_graph.py b/vitrage/graph/driver/networkx_graph.py index 0c7c816a9..345292743 100644 --- a/vitrage/graph/driver/networkx_graph.py +++ b/vitrage/graph/driver/networkx_graph.py @@ -17,6 +17,7 @@ import json import networkx as nx from networkx.algorithms.operators.binary import compose from networkx.readwrite import json_graph +from six.moves import cPickle from oslo_log import log as logging @@ -293,13 +294,16 @@ class NXGraph(Graph): return json.dumps(node_link_data) - def to_json(self): - return json_graph.node_link_data(self._g) + def write_gpickle(self): + return cPickle.dumps(self._g, cPickle.HIGHEST_PROTOCOL) @staticmethod - def from_json(data): - graph = NXGraph() - graph._g = nx.MultiDiGraph(json_graph.node_link_graph(data)) + def read_gpickle(data, graph_to_update=None): + if graph_to_update is not None: + graph = graph_to_update + else: + graph = NXGraph() + graph._g = cPickle.loads(data) return graph def union(self, other_graph): diff --git a/vitrage/persistency/__init__.py b/vitrage/persistency/__init__.py index 71eac10b3..a7950a197 100644 --- a/vitrage/persistency/__init__.py +++ b/vitrage/persistency/__init__.py @@ -21,9 +21,8 @@ OPTS = [ 'datasources to the persistor'), cfg.BoolOpt('enable_persistency', default=False, - help='Periodically store the entire graph snapshot to ' - 'the database'), + help='Periodically store entity graph snapshot to database'), cfg.IntOpt('graph_persistency_interval', default=3600, - help='Store the graph to the database every X seconds'), + help='Store graph to database every X seconds'), ] diff --git a/vitrage/persistency/graph_persistor.py b/vitrage/persistency/graph_persistor.py index 4b97d35a0..e0f46b8e7 100644 --- a/vitrage/persistency/graph_persistor.py +++ b/vitrage/persistency/graph_persistor.py @@ -36,7 +36,7 @@ class GraphPersistor(object): def store_graph(self, graph): try: - graph_snapshot = graph.to_json() + graph_snapshot = graph.write_gpickle() db_row = models.GraphSnapshot( last_event_timestamp=self.last_event_timestamp, graph_snapshot=graph_snapshot) @@ -47,7 +47,7 @@ class GraphPersistor(object): def load_graph(self, timestamp=None): db_row = self.db_connection.graph_snapshots.query(timestamp) if \ timestamp else self.db_connection.graph_snapshots.query(utcnow()) - return NXGraph.from_json(db_row.graph_snapshot) if db_row else None + return NXGraph.read_gpickle(db_row.graph_snapshot) if db_row else None def delete_graph_snapshots(self, timestamp): """Deletes all graph snapshots until timestamp""" diff --git a/vitrage/storage/impl_sqlalchemy.py b/vitrage/storage/impl_sqlalchemy.py index 850c9bf75..b6a439e7a 100644 --- a/vitrage/storage/impl_sqlalchemy.py +++ b/vitrage/storage/impl_sqlalchemy.py @@ -277,7 +277,7 @@ class EventsConnection(base.EventsConnection, BaseTableConn): lt_collector_timestamp, query) - return query.all() + return query.order_by(models.Event.collector_timestamp.desc()).all() @staticmethod def _update_query_gt_lt(gt_collector_timestamp, diff --git a/vitrage/storage/sqlalchemy/models.py b/vitrage/storage/sqlalchemy/models.py index c48a2232e..06bd0c339 100644 --- a/vitrage/storage/sqlalchemy/models.py +++ b/vitrage/storage/sqlalchemy/models.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. import json +import zlib from oslo_db.sqlalchemy import models @@ -49,14 +50,23 @@ class JSONEncodedDict(types.TypeDecorator): impl = types.TEXT def process_bind_param(self, value, dialect): - if value is not None: - value = json.dumps(value) - return value + return json.dumps(value) if value else None def process_result_value(self, value, dialect): - if value is not None: - value = json.loads(value) - return value + return json.loads(value) if value else None + + +class CompressedBinary(types.TypeDecorator): + impl = types.LargeBinary + + def process_bind_param(self, value, dialect): + return zlib.compress(value) if value else None + + def process_result_value(self, value, dialect): + return zlib.decompress(value) if value else None + + def copy(self, **kwargs): + return CompressedBinary(self.impl.length) class Event(Base): @@ -125,7 +135,7 @@ class GraphSnapshot(Base): __tablename__ = 'graph_snapshots' last_event_timestamp = Column(DateTime, primary_key=True, nullable=False) - graph_snapshot = Column(JSONEncodedDict(), nullable=False) + graph_snapshot = Column(CompressedBinary((2 ** 32) - 1), nullable=False) def __repr__(self): return \