From edd80c26f02ff737565d0359aad07564ce98cc16 Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Mon, 9 Jun 2014 16:19:28 +0400 Subject: [PATCH] Minimize memory consumption in dashboard Memory consumption optimized from 2.7G down to 1.5G on full dataset: * Record index is not necessary in runtime storage since it it needed for update operations only * Store compact records as tuples instead of dicts Co-author: Yuriy Taraday Change-Id: I9a361e9373b5ed4fada527959a604cf09a02e5b9 --- dashboard/vault.py | 32 +++++++++++++++++++---- stackalytics/processor/runtime_storage.py | 9 ++++--- 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/dashboard/vault.py b/dashboard/vault.py index 3c68f0ee5..2113a9139 100644 --- a/dashboard/vault.py +++ b/dashboard/vault.py @@ -13,9 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections import os +import UserDict import flask +import itertools from oslo.config import cfg import six @@ -34,15 +37,34 @@ RECORD_FIELDS_FOR_AGGREGATE = ['record_id', 'primary_key', 'record_type', 'disagreement', 'value', 'status', 'blueprint_id'] +_CompactRecordTuple = collections.namedtuple('CompactRecord', + RECORD_FIELDS_FOR_AGGREGATE) + + +class CompactRecord(_CompactRecordTuple, UserDict.DictMixin): + __slots__ = () + + def __getitem__(self, key): + if isinstance(key, str): + return getattr(self, key) + else: + return super(CompactRecord, self).__getitem__(key) + + def keys(self): + return RECORD_FIELDS_FOR_AGGREGATE + + def has_key(self, key): + return key in RECORD_FIELDS_FOR_AGGREGATE + + def iteritems(self): + return itertools.izip(RECORD_FIELDS_FOR_AGGREGATE, self) + def compact_records(records): for record in records: - compact = dict([(k, record[k]) for k in RECORD_FIELDS_FOR_AGGREGATE - if k in record]) - yield compact + compact = dict((k, record.get(k)) for k in RECORD_FIELDS_FOR_AGGREGATE) - if 'blueprint_id' in compact: - del compact['blueprint_id'] + yield CompactRecord(**compact) def extend_record(record): diff --git a/stackalytics/processor/runtime_storage.py b/stackalytics/processor/runtime_storage.py index 4353c29df..ce48e52f8 100644 --- a/stackalytics/processor/runtime_storage.py +++ b/stackalytics/processor/runtime_storage.py @@ -65,17 +65,19 @@ class MemcachedStorage(RuntimeStorage): if stripped: storage_uri = stripped.split(',') self.memcached = memcache.Client(storage_uri) - self._build_index() self._init_user_count() + self.record_index = {} else: raise Exception('Invalid storage uri %s' % uri) - def _build_index(self): - self.record_index = {} + def _build_index_lazily(self): + if self.record_index: + return for record in self.get_all_records(): self.record_index[record['primary_key']] = record['record_id'] def set_records(self, records_iterator, merge_handler=None): + self._build_index_lazily() for record in records_iterator: if record['primary_key'] in self.record_index: # update @@ -103,6 +105,7 @@ class MemcachedStorage(RuntimeStorage): self._commit_update(record_id) def apply_corrections(self, corrections_iterator): + self._build_index_lazily() for correction in corrections_iterator: if correction['primary_key'] not in self.record_index: continue