Merge "changes to graph_snapshots table"

This commit is contained in:
Zuul 2018-02-20 16:45:01 +00:00 committed by Gerrit Code Review
commit 3578038dd0
5 changed files with 31 additions and 18 deletions

View File

@ -17,6 +17,7 @@ import json
import networkx as nx import networkx as nx
from networkx.algorithms.operators.binary import compose from networkx.algorithms.operators.binary import compose
from networkx.readwrite import json_graph from networkx.readwrite import json_graph
from six.moves import cPickle
from oslo_log import log as logging from oslo_log import log as logging
@ -324,13 +325,16 @@ class NXGraph(Graph):
return json.dumps(node_link_data) return json.dumps(node_link_data)
def to_json(self): def write_gpickle(self):
return json_graph.node_link_data(self._g) return cPickle.dumps(self._g, cPickle.HIGHEST_PROTOCOL)
@staticmethod @staticmethod
def from_json(data): def read_gpickle(data, graph_to_update=None):
if graph_to_update is not None:
graph = graph_to_update
else:
graph = NXGraph() graph = NXGraph()
graph._g = nx.MultiDiGraph(json_graph.node_link_graph(data)) graph._g = cPickle.loads(data)
return graph return graph
def union(self, other_graph): def union(self, other_graph):

View File

@ -21,9 +21,8 @@ OPTS = [
'datasources to the persistor'), 'datasources to the persistor'),
cfg.BoolOpt('enable_persistency', cfg.BoolOpt('enable_persistency',
default=False, default=False,
help='Periodically store the entire graph snapshot to ' help='Periodically store entity graph snapshot to database'),
'the database'),
cfg.IntOpt('graph_persistency_interval', cfg.IntOpt('graph_persistency_interval',
default=3600, default=3600,
help='Store the graph to the database every X seconds'), help='Store graph to database every X seconds'),
] ]

View File

@ -36,7 +36,7 @@ class GraphPersistor(object):
def store_graph(self, graph): def store_graph(self, graph):
try: try:
graph_snapshot = graph.to_json() graph_snapshot = graph.write_gpickle()
db_row = models.GraphSnapshot( db_row = models.GraphSnapshot(
last_event_timestamp=self.last_event_timestamp, last_event_timestamp=self.last_event_timestamp,
graph_snapshot=graph_snapshot) graph_snapshot=graph_snapshot)
@ -47,7 +47,7 @@ class GraphPersistor(object):
def load_graph(self, timestamp=None): def load_graph(self, timestamp=None):
db_row = self.db_connection.graph_snapshots.query(timestamp) if \ db_row = self.db_connection.graph_snapshots.query(timestamp) if \
timestamp else self.db_connection.graph_snapshots.query(utcnow()) timestamp else self.db_connection.graph_snapshots.query(utcnow())
return NXGraph.from_json(db_row.graph_snapshot) if db_row else None return NXGraph.read_gpickle(db_row.graph_snapshot) if db_row else None
def delete_graph_snapshots(self, timestamp): def delete_graph_snapshots(self, timestamp):
"""Deletes all graph snapshots until timestamp""" """Deletes all graph snapshots until timestamp"""

View File

@ -277,7 +277,7 @@ class EventsConnection(base.EventsConnection, BaseTableConn):
lt_collector_timestamp, lt_collector_timestamp,
query) query)
return query.all() return query.order_by(models.Event.collector_timestamp.desc()).all()
@staticmethod @staticmethod
def _update_query_gt_lt(gt_collector_timestamp, def _update_query_gt_lt(gt_collector_timestamp,

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import json import json
import zlib
from oslo_db.sqlalchemy import models from oslo_db.sqlalchemy import models
@ -49,14 +50,23 @@ class JSONEncodedDict(types.TypeDecorator):
impl = types.TEXT impl = types.TEXT
def process_bind_param(self, value, dialect): def process_bind_param(self, value, dialect):
if value is not None: return json.dumps(value) if value else None
value = json.dumps(value)
return value
def process_result_value(self, value, dialect): def process_result_value(self, value, dialect):
if value is not None: return json.loads(value) if value else None
value = json.loads(value)
return value
class CompressedBinary(types.TypeDecorator):
impl = types.LargeBinary
def process_bind_param(self, value, dialect):
return zlib.compress(value) if value else None
def process_result_value(self, value, dialect):
return zlib.decompress(value) if value else None
def copy(self, **kwargs):
return CompressedBinary(self.impl.length)
class Event(Base): class Event(Base):
@ -125,7 +135,7 @@ class GraphSnapshot(Base):
__tablename__ = 'graph_snapshots' __tablename__ = 'graph_snapshots'
last_event_timestamp = Column(DateTime, primary_key=True, nullable=False) last_event_timestamp = Column(DateTime, primary_key=True, nullable=False)
graph_snapshot = Column(JSONEncodedDict(), nullable=False) graph_snapshot = Column(CompressedBinary((2 ** 32) - 1), nullable=False)
def __repr__(self): def __repr__(self):
return \ return \