diff --git a/stackalytics/processor/dump.py b/stackalytics/processor/dump.py index e6bf055ce..c5b467c92 100644 --- a/stackalytics/processor/dump.py +++ b/stackalytics/processor/dump.py @@ -52,12 +52,16 @@ def import_data(runtime_storage_inst, fd): if len(bucket) < runtime_storage.RECORD_ID_PREFIX: bucket[record['record_id']] = record else: - runtime_storage_inst.memcached.set_multi( - bucket, key_prefix=runtime_storage.RECORD_ID_PREFIX) + if not runtime_storage_inst.memcached.set_multi( + bucket, key_prefix=runtime_storage.RECORD_ID_PREFIX): + LOG.critical('Failed to set_multi in memcached') + raise Exception('Failed to set_multi in memcached') bucket = {} if bucket: - runtime_storage_inst.memcached.set_multi( - bucket, key_prefix=runtime_storage.RECORD_ID_PREFIX) + if not runtime_storage_inst.memcached.set_multi( + bucket, key_prefix=runtime_storage.RECORD_ID_PREFIX): + LOG.critical('Failed to set_multi in memcached') + raise Exception('Failed to set_multi in memcached') runtime_storage_inst._set_record_count(count) diff --git a/stackalytics/processor/runtime_storage.py b/stackalytics/processor/runtime_storage.py index f10c25547..4353c29df 100644 --- a/stackalytics/processor/runtime_storage.py +++ b/stackalytics/processor/runtime_storage.py @@ -83,22 +83,21 @@ class MemcachedStorage(RuntimeStorage): if not merge_handler: record['record_id'] = record_id LOG.debug('Update record %s', record) - self.memcached.set(self._get_record_name(record_id), - record) + self.set_by_key(self._get_record_name(record_id), record) else: - original = self.memcached.get(self._get_record_name( + original = self.get_by_key(self._get_record_name( record_id)) if merge_handler(original, record): LOG.debug('Update record with merge %s', record) - self.memcached.set(self._get_record_name(record_id), - original) + self.set_by_key(self._get_record_name(record_id), + original) else: # insert record record_id = self._get_record_count() record['record_id'] = record_id self.record_index[record['primary_key']] = record_id LOG.debug('Insert new record %s', record) - self.memcached.set(self._get_record_name(record_id), record) + self.set_by_key(self._get_record_name(record_id), record) self._set_record_count(record_id + 1) self._commit_update(record_id) @@ -109,7 +108,7 @@ class MemcachedStorage(RuntimeStorage): continue record_id = self.record_index[correction['primary_key']] - original = self.memcached.get(self._get_record_name(record_id)) + original = self.get_by_key(self._get_record_name(record_id)) need_update = False for field, value in six.iteritems(correction): @@ -118,7 +117,7 @@ class MemcachedStorage(RuntimeStorage): original[field] = value if need_update: - self.memcached.set(self._get_record_name(record_id), original) + self.set_by_key(self._get_record_name(record_id), original) self._commit_update(record_id) def inc_user_count(self): @@ -134,16 +133,22 @@ class MemcachedStorage(RuntimeStorage): return self.memcached.get(key.encode('utf8')) def set_by_key(self, key, value): - self.memcached.set(key.encode('utf8'), value) + if not self.memcached.set(key.encode('utf8'), value): + LOG.critical('Failed to store data in memcached: ' + 'key %(key)s, value %(value)s', + {'key': key, 'value': value}) + raise Exception('Memcached set failed') def delete_by_key(self, key): - self.memcached.delete(key.encode('utf8')) + if not self.memcached.delete(key.encode('utf8')): + LOG.critical('Failed to delete data from memcached: key %s', key) + raise Exception('Memcached delete failed') def get_update(self, pid): - last_update = self.memcached.get('pid:%s' % pid) + last_update = self.get_by_key('pid:%s' % pid) update_count = self._get_update_count() - self.memcached.set('pid:%s' % pid, update_count) + self.set_by_key('pid:%s' % pid, update_count) self._set_pids(pid) if not last_update: @@ -159,23 +164,23 @@ class MemcachedStorage(RuntimeStorage): yield i def active_pids(self, pids): - stored_pids = self.memcached.get('pids') or set() + stored_pids = self.get_by_key('pids') or set() for pid in stored_pids: if pid not in pids: LOG.debug('Purge dead uwsgi pid %s from pids list', pid) - self.memcached.delete('pid:%s' % pid) + self.delete_by_key('pid:%s' % pid) - self.memcached.set('pids', pids) + self.set_by_key('pids', pids) # remove unneeded updates min_update = self._get_update_count() for pid in pids: - n = self.memcached.get('pid:%s' % pid) + n = self.get_by_key('pid:%s' % pid) if n: if n < min_update: min_update = n - first_valid_update = self.memcached.get('first_valid_update') or 0 + first_valid_update = self.get_by_key('first_valid_update') or 0 LOG.debug('Purge polled updates from %(first)s to %(min)s', {'first': first_valid_update, 'min': min_update}) @@ -183,28 +188,29 @@ class MemcachedStorage(RuntimeStorage): BULK_DELETE_SIZE): if not self.memcached.delete_multi(delete_id_set, key_prefix=UPDATE_ID_PREFIX): - raise Exception('Failed to delete from memcache') + LOG.critical('Failed to delete_multi from memcached') + raise Exception('Failed to delete_multi from memcached') - self.memcached.set('first_valid_update', min_update) + self.set_by_key('first_valid_update', min_update) def _get_update_count(self): - return self.memcached.get('update:count') or 0 + return self.get_by_key('update:count') or 0 def _set_pids(self, pid): - pids = self.memcached.get('pids') or set() + pids = self.get_by_key('pids') or set() if pid in pids: return pids.add(pid) - self.memcached.set('pids', pids) + self.set_by_key('pids', pids) def _get_record_name(self, record_id): return RECORD_ID_PREFIX + str(record_id) def _get_record_count(self): - return self.memcached.get('record:count') or 0 + return self.get_by_key('record:count') or 0 def _set_record_count(self, count): - self.memcached.set('record:count', count) + self.set_by_key('record:count', count) def get_all_records(self): for record_id_set in utils.make_range(0, self._get_record_count(), @@ -215,16 +221,16 @@ class MemcachedStorage(RuntimeStorage): def _commit_update(self, record_id): count = self._get_update_count() - self.memcached.set(UPDATE_ID_PREFIX + str(count), record_id) - self.memcached.set('update:count', count + 1) + self.set_by_key(UPDATE_ID_PREFIX + str(count), record_id) + self.set_by_key('update:count', count + 1) def _init_user_count(self): - if not self.memcached.get('user:count'): - self.memcached.set('user:count', 1) + if not self.get_by_key('user:count'): + self.set_by_key('user:count', 1) def get_runtime_storage(uri): - LOG.debug('Runtime storage is requested for uri %s' % uri) + LOG.debug('Runtime storage is requested for uri %s', uri) match = re.search(MEMCACHED_URI_PREFIX, uri) if match: return MemcachedStorage(uri)