Files
stackalytics/stackalytics/processor/runtime_storage.py
Ilya Shakhat 9f573b8fc7 Revert "Add support for user correction"
This reverts commit bac722a749.

There are 2 user profiles that mix with each other because of the same
email re-used by other person. The proposed change was intended to fix
this, but it does not. All it does is update of user profile after
record processing, but at that moment records already refer to
the wrong person. The correct solution is to somehow restrict usage
of email (e.g. by date) or at least restrict mapping of particular
user profiles.

Change-Id: I1c37bd9778d740d5095f08885bb3cb63a656a604
2017-04-10 09:58:23 +00:00

248 lines
8.4 KiB
Python

# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import memcache
from oslo_log import log as logging
import six
from stackalytics.processor import utils
LOG = logging.getLogger(__name__)
BULK_READ_SIZE = 64
BULK_DELETE_SIZE = 4096
RECORD_ID_PREFIX = 'record:'
UPDATE_ID_PREFIX = 'update:'
MEMCACHED_URI_PREFIX = r'^memcached:\/\/'
class RuntimeStorage(object):
def __init__(self, uri):
pass
def set_records(self, records_iterator):
pass
def apply_corrections(self, corrections_iterator):
pass
def get_by_key(self, key):
pass
def set_by_key(self, key, value):
pass
def get_update(self, pid):
pass
def active_pids(self, pids):
pass
def get_all_records(self):
pass
class MemcachedStorage(RuntimeStorage):
def __init__(self, uri):
super(MemcachedStorage, self).__init__(uri)
stripped = re.sub(MEMCACHED_URI_PREFIX, '', uri)
if stripped:
storage_uri = stripped.split(',')
self.memcached = memcache.Client(storage_uri)
self._init_user_count()
self.record_index = {}
else:
raise Exception('Invalid storage uri %s' % uri)
def _build_index_lazily(self):
if self.record_index:
return
for record in self.get_all_records():
self.record_index[record['primary_key']] = record['record_id']
def set_records(self, records_iterator, merge_handler=None):
self._build_index_lazily()
for record in records_iterator:
if record['primary_key'] in self.record_index:
# update
record_id = self.record_index[record['primary_key']]
if not merge_handler:
record['record_id'] = record_id
LOG.debug('Update record %s', record)
self.set_by_key(self._get_record_name(record_id), record)
else:
original = self.get_by_key(self._get_record_name(
record_id))
if merge_handler(original, record):
LOG.debug('Update record with merge %s', record)
self.set_by_key(self._get_record_name(record_id),
original)
else:
# insert record
record_id = self._get_record_count()
record['record_id'] = record_id
self.record_index[record['primary_key']] = record_id
LOG.debug('Insert new record %s', record)
self.set_by_key(self._get_record_name(record_id), record)
self._set_record_count(record_id + 1)
self._commit_update(record_id)
def apply_corrections(self, corrections_iterator):
self._build_index_lazily()
for correction in corrections_iterator:
if correction['primary_key'] not in self.record_index:
continue
record_id = self.record_index[correction['primary_key']]
original = self.get_by_key(self._get_record_name(record_id))
need_update = False
for field, value in six.iteritems(correction):
if (field not in original) or (original[field] != value):
need_update = True
original[field] = value
if need_update:
self.set_by_key(self._get_record_name(record_id), original)
self._commit_update(record_id)
def inc_user_count(self):
return self.memcached.incr('user:count')
def get_all_users(self):
for n in six.moves.range(0, self.get_by_key('user:count') + 1):
user = self.get_by_key('user:%s' % n)
if user:
yield user
def get_by_key(self, key):
if six.PY2:
key = key.encode('utf8')
return self.memcached.get(key)
def set_by_key(self, key, value):
if six.PY2:
key = key.encode('utf8')
if not self.memcached.set(key, value):
LOG.critical('Failed to store data in memcached: '
'key %(key)s, value %(value)s',
{'key': key, 'value': value})
raise Exception('Memcached set failed')
def delete_by_key(self, key):
if six.PY2:
key = key.encode('utf8')
if not self.memcached.delete(key):
LOG.critical('Failed to delete data from memcached: key %s', key)
raise Exception('Memcached delete failed')
def get_update(self, pid):
last_update = self.get_by_key('pid:%s' % pid)
update_count = self._get_update_count()
self.set_by_key('pid:%s' % pid, update_count)
self._set_pids(pid)
if not last_update:
for i in self.get_all_records():
yield i
else:
for update_id_set in utils.make_range(last_update, update_count,
BULK_READ_SIZE):
update_set = self.memcached.get_multi(
update_id_set, UPDATE_ID_PREFIX).values()
for i in self.memcached.get_multi(
update_set, RECORD_ID_PREFIX).values():
yield i
def active_pids(self, pids):
stored_pids = self.get_by_key('pids') or set()
for pid in stored_pids:
if pid not in pids:
LOG.debug('Purge dead uwsgi pid %s from pids list', pid)
self.delete_by_key('pid:%s' % pid)
self.set_by_key('pids', pids)
# remove unneeded updates
min_update = self._get_update_count()
for pid in pids:
n = self.get_by_key('pid:%s' % pid)
if n:
if n < min_update:
min_update = n
first_valid_update = self.get_by_key('first_valid_update') or 0
LOG.debug('Purge polled updates from %(first)s to %(min)s',
{'first': first_valid_update, 'min': min_update})
for delete_id_set in utils.make_range(first_valid_update, min_update,
BULK_DELETE_SIZE):
if not self.memcached.delete_multi(delete_id_set,
key_prefix=UPDATE_ID_PREFIX):
LOG.critical('Failed to delete_multi from memcached')
raise Exception('Failed to delete_multi from memcached')
self.set_by_key('first_valid_update', min_update)
def _get_update_count(self):
return self.get_by_key('update:count') or 0
def _set_pids(self, pid):
pids = self.get_by_key('pids') or set()
if pid in pids:
return
pids.add(pid)
self.set_by_key('pids', pids)
def _get_record_name(self, record_id):
return RECORD_ID_PREFIX + str(record_id)
def _get_record_count(self):
return self.get_by_key('record:count') or 0
def _set_record_count(self, count):
self.set_by_key('record:count', count)
def get_all_records(self):
for record_id_set in utils.make_range(0, self._get_record_count(),
BULK_READ_SIZE):
for i in self.memcached.get_multi(
record_id_set, RECORD_ID_PREFIX).values():
yield i
def _commit_update(self, record_id):
count = self._get_update_count()
self.set_by_key(UPDATE_ID_PREFIX + str(count), record_id)
self.set_by_key('update:count', count + 1)
def _init_user_count(self):
if not self.get_by_key('user:count'):
self.set_by_key('user:count', 1)
def get_runtime_storage(uri):
LOG.debug('Runtime storage is requested for uri %s', uri)
match = re.search(MEMCACHED_URI_PREFIX, uri)
if match:
return MemcachedStorage(uri)
else:
raise Exception('Unknown runtime storage uri %s' % uri)