Users are kept in runtime storage during update cycle

* Users are persisted by user_id and accessed from web font directly
* Web app initializing section is refactored
* 404 error handler is made more looking like error

Closes bug 1213841

Change-Id: Ie6ff57505dcc14291cd3267a7448cddb2b478eb2
This commit is contained in:
Ilya Shakhat
2013-08-19 16:57:18 +04:00
parent 662a328cb8
commit 5a7da39e7d
7 changed files with 117 additions and 86 deletions

View File

@@ -17,13 +17,13 @@ MEMORY_STORAGE_CACHED = 0
class MemoryStorage(object): class MemoryStorage(object):
def __init__(self, records): def __init__(self):
pass pass
class CachedMemoryStorage(MemoryStorage): class CachedMemoryStorage(MemoryStorage):
def __init__(self, records): def __init__(self):
super(CachedMemoryStorage, self).__init__(records) super(CachedMemoryStorage, self).__init__()
# common indexes # common indexes
self.records = {} self.records = {}
@@ -43,25 +43,28 @@ class CachedMemoryStorage(MemoryStorage):
'release': self.release_index, 'release': self.release_index,
} }
for record in records:
self._save_record(record)
self.company_name_mapping = dict((c.lower(), c)
for c in self.company_index.keys())
def _save_record(self, record): def _save_record(self, record):
self.records[record['record_id']] = record self.records[record['record_id']] = record
for key, index in self.indexes.iteritems(): for key, index in self.indexes.iteritems():
self._add_to_index(index, record, key) self._add_to_index(index, record, key)
def update(self, records): def update(self, records):
have_updates = False
for record in records: for record in records:
have_updates = True
record_id = record['record_id'] record_id = record['record_id']
if record_id in self.records: if record_id in self.records:
# remove existing record from indexes # remove existing record from indexes
self._remove_record_from_index(self.records[record_id]) self._remove_record_from_index(self.records[record_id])
self._save_record(record) self._save_record(record)
if have_updates:
self.company_name_mapping = dict(
(c.lower(), c) for c in self.company_index.keys())
return have_updates
def _remove_record_from_index(self, record): def _remove_record_from_index(self, record):
for key, index in self.indexes.iteritems(): for key, index in self.indexes.iteritems():
index[record[key]].remove(record['record_id']) index[record[key]].remove(record['record_id'])
@@ -134,8 +137,8 @@ class CachedMemoryStorage(MemoryStorage):
return self.user_id_index.keys() return self.user_id_index.keys()
def get_memory_storage(memory_storage_type, records): def get_memory_storage(memory_storage_type):
if memory_storage_type == MEMORY_STORAGE_CACHED: if memory_storage_type == MEMORY_STORAGE_CACHED:
return CachedMemoryStorage(records) return CachedMemoryStorage()
else: else:
raise Exception('Unknown memory storage type %s' % memory_storage_type) raise Exception('Unknown memory storage type %s' % memory_storage_type)

View File

@@ -1,19 +1,15 @@
{% extends "layout.html" %} {% extends "base.html" %}
{% block title %} {% block head %}
404 <meta http-equiv="refresh" content="5; url=/">
{% endblock %} {% endblock %}
{% block left_frame %} {% block body %}
<h2>404 Not Found</h2> <h2>404 Not Found</h2>
<div>The requested page is not found. Return to <a href="/">Overview</a> <div>The requested page is not found. The page will be automatically redirected to <a href="/">Overview</a>
</div> </div>
{% endblock %} {% endblock %}
{% block right_frame %}
{% endblock %}

View File

@@ -75,36 +75,33 @@ else:
def get_vault(): def get_vault():
vault = getattr(app, 'stackalytics_vault', None) vault = getattr(app, 'stackalytics_vault', None)
if not vault: if not vault:
try:
vault = {} vault = {}
runtime_storage_inst = runtime_storage.get_runtime_storage( runtime_storage_inst = runtime_storage.get_runtime_storage(
cfg.CONF.runtime_storage_uri) cfg.CONF.runtime_storage_uri)
vault['runtime_storage'] = runtime_storage_inst vault['runtime_storage'] = runtime_storage_inst
vault['memory_storage'] = memory_storage.get_memory_storage( vault['memory_storage'] = memory_storage.get_memory_storage(
memory_storage.MEMORY_STORAGE_CACHED, memory_storage.MEMORY_STORAGE_CACHED)
vault['runtime_storage'].get_update(os.getpid()))
releases = list(runtime_storage_inst.get_by_key('releases'))
vault['start_date'] = releases[0]['end_date']
vault['end_date'] = releases[-1]['end_date']
start_date = releases[0]['end_date']
for r in releases[1:]:
r['start_date'] = start_date
start_date = r['end_date']
vault['releases'] = dict((r['release_name'].lower(), r)
for r in releases[1:])
modules = runtime_storage_inst.get_by_key('repos')
vault['modules'] = dict((r['module'].lower(),
r['project_type'].lower()) for r in modules)
app.stackalytics_vault = vault
init_project_types(vault) init_project_types(vault)
else: init_releases(vault)
app.stackalytics_vault = vault
except Exception as e:
LOG.critical('Failed to initialize application: %s', e)
LOG.exception(e)
flask.abort(500)
if not getattr(flask.request, 'stackalytics_updated', None): if not getattr(flask.request, 'stackalytics_updated', None):
flask.request.stackalytics_updated = True flask.request.stackalytics_updated = True
memory_storage_inst = vault['memory_storage'] memory_storage_inst = vault['memory_storage']
memory_storage_inst.update( have_updates = memory_storage_inst.update(
vault['runtime_storage'].get_update(os.getpid())) vault['runtime_storage'].get_update(os.getpid()))
if have_updates:
init_project_types(vault)
init_releases(vault)
return vault return vault
@@ -112,12 +109,27 @@ def get_memory_storage():
return get_vault()['memory_storage'] return get_vault()['memory_storage']
def init_releases(vault):
runtime_storage_inst = vault['runtime_storage']
releases = runtime_storage_inst.get_by_key('releases')
if not releases:
raise Exception('Releases are missing in runtime storage')
vault['start_date'] = releases[0]['end_date']
vault['end_date'] = releases[-1]['end_date']
start_date = releases[0]['end_date']
for r in releases[1:]:
r['start_date'] = start_date
start_date = r['end_date']
vault['releases'] = dict((r['release_name'].lower(), r)
for r in releases[1:])
def init_project_types(vault): def init_project_types(vault):
runtime_storage_inst = vault['runtime_storage'] runtime_storage_inst = vault['runtime_storage']
project_type_options = {} project_type_options = {}
project_type_group_index = {'all': set()} project_type_group_index = {'all': set()}
for repo in runtime_storage_inst.get_by_key('repos'): for repo in runtime_storage_inst.get_by_key('repos') or []:
project_type = repo['project_type'].lower() project_type = repo['project_type'].lower()
project_group = None project_group = None
if ('project_group' in repo) and (repo['project_group']): if ('project_group' in repo) and (repo['project_group']):
@@ -176,6 +188,11 @@ def is_project_type_valid(project_type):
return False return False
def get_user_from_runtime_storage(user_id):
runtime_storage_inst = get_vault()['runtime_storage']
return runtime_storage_inst.get_by_key('user:%s' % user_id)
# Utils --------- # Utils ---------
def get_default(param_name): def get_default(param_name):
@@ -209,12 +226,6 @@ def get_single_parameter(kwargs, singular_name, use_default=True):
return '' return ''
def validate_user_id(user_id):
runtime_storage_inst = get_vault()['runtime_storage']
users_index = runtime_storage_inst.get_by_key('users')
return user_id in users_index
# Decorators --------- # Decorators ---------
def record_filter(ignore=None, use_default=True): def record_filter(ignore=None, use_default=True):
@@ -250,7 +261,7 @@ def record_filter(ignore=None, use_default=True):
if 'user_id' not in ignore: if 'user_id' not in ignore:
param = get_parameter(kwargs, 'user_id', 'user_ids') param = get_parameter(kwargs, 'user_id', 'user_ids')
param = [u for u in param if validate_user_id(u)] param = [u for u in param if get_user_from_runtime_storage(u)]
if param: if param:
record_ids &= ( record_ids &= (
memory_storage.get_record_ids_by_user_ids(param)) memory_storage.get_record_ids_by_user_ids(param))
@@ -674,21 +685,19 @@ def get_users_json(records):
@app.route('/data/users/<user_id>.json') @app.route('/data/users/<user_id>.json')
def get_user(user_id): def get_user(user_id):
runtime_storage_inst = get_vault()['runtime_storage'] user = get_user_from_runtime_storage(user_id)
users_index = runtime_storage_inst.get_by_key('users') if not user:
if user_id in users_index: flask.abort(404)
res = users_index[user_id].copy() user['id'] = user['user_id']
res['id'] = res['user_id'] user['text'] = user['user_name']
res['text'] = res['user_name'] if user['companies']:
if res['companies']: company_name = user['companies'][-1]['company_name']
company_name = res['companies'][-1]['company_name'] user['company_link'] = make_link(
res['company_link'] = make_link(
company_name, '/', {'company': company_name}) company_name, '/', {'company': company_name})
else: else:
res['company_link'] = '' user['company_link'] = ''
res['gravatar'] = gravatar(res['emails'][0]) user['gravatar'] = gravatar(user['emails'][0])
return json.dumps({'user': res}) return json.dumps({'user': user})
return json.dumps({})
@app.route('/data/timeline') @app.route('/data/timeline')

View File

@@ -76,24 +76,25 @@ def _retrieve_project_list(runtime_storage_inst, project_sources):
runtime_storage_inst.set_by_key('repos', stored_repos) runtime_storage_inst.set_by_key('repos', stored_repos)
def _process_users(users): def _process_users(runtime_storage_inst, users):
users_index = {} users_index = {}
for user in users: for user in users:
runtime_storage_inst.set_by_key('user:%s' % user['user_id'], user)
if 'user_id' in user: if 'user_id' in user:
users_index[user['user_id']] = user users_index[user['user_id']] = user
if 'launchpad_id' in user: if 'launchpad_id' in user:
users_index[user['launchpad_id']] = user users_index[user['launchpad_id']] = user
for email in user['emails']: for email in user['emails']:
users_index[email] = user users_index[email] = user
return users_index runtime_storage_inst.set_by_key('users', users_index)
def _process_companies(companies): def _process_companies(runtime_storage_inst, companies):
domains_index = {} domains_index = {}
for company in companies: for company in companies:
for domain in company['domains']: for domain in company['domains']:
domains_index[domain] = company['company_name'] domains_index[domain] = company['company_name']
return domains_index runtime_storage_inst.set_by_key('companies', domains_index)
KEYS = { KEYS = {
@@ -105,15 +106,16 @@ KEYS = {
def _update_default_data(runtime_storage_inst, default_data): def _update_default_data(runtime_storage_inst, default_data):
LOG.debug('Update runtime storage with default data')
for key, processor in KEYS.iteritems(): for key, processor in KEYS.iteritems():
if processor: if processor:
value = processor(default_data[key]) processor(runtime_storage_inst, default_data[key])
else: else:
value = default_data[key] runtime_storage_inst.set_by_key(key, default_data[key])
runtime_storage_inst.set_by_key(key, value)
def process(runtime_storage_inst, default_data, sources_root): def process(runtime_storage_inst, default_data, sources_root):
LOG.debug('Process default data')
normalizer.normalize_default_data(default_data) normalizer.normalize_default_data(default_data)

View File

@@ -71,6 +71,9 @@ class RecordProcessor(object):
LOG.debug('Create new user: %s', user) LOG.debug('Create new user: %s', user)
return user return user
def _store_user(self, user):
self.runtime_storage_inst.set_by_key('user:%s' % user['user_id'], user)
def _get_lp_info(self, email): def _get_lp_info(self, email):
lp_profile = None lp_profile = None
if not re.match(r'[\w\d_\.-]+@([\w\d_\.-]+\.)+[\w]+', email): if not re.match(r'[\w\d_\.-]+@([\w\d_\.-]+\.)+[\w]+', email):
@@ -115,6 +118,7 @@ class RecordProcessor(object):
user_name = record['author_name'] user_name = record['author_name']
user = self._create_user(launchpad_id, email, user_name) user = self._create_user(launchpad_id, email, user_name)
self._store_user(user)
self.users_index[email] = user self.users_index[email] = user
if user['launchpad_id']: if user['launchpad_id']:
self.users_index[user['launchpad_id']] = user self.users_index[user['launchpad_id']] = user

View File

@@ -115,10 +115,10 @@ class MemcachedStorage(RuntimeStorage):
self._commit_update(record_id) self._commit_update(record_id)
def get_by_key(self, key): def get_by_key(self, key):
return self.memcached.get(key) return self.memcached.get(key.encode('utf8'))
def set_by_key(self, key, value): def set_by_key(self, key, value):
self.memcached.set(key, value) self.memcached.set(key.encode('utf8'), value)
def get_update(self, pid): def get_update(self, pid):
last_update = self.memcached.get('pid:%s' % pid) last_update = self.memcached.get('pid:%s' % pid)

View File

@@ -16,7 +16,6 @@
import mock import mock
import testtools import testtools
from stackalytics.processor import default_data_processor
from stackalytics.processor import record_processor from stackalytics.processor import record_processor
from stackalytics.processor import runtime_storage from stackalytics.processor import runtime_storage
from stackalytics.processor import utils from stackalytics.processor import utils
@@ -25,6 +24,26 @@ from stackalytics.processor import utils
LP_URI = 'https://api.launchpad.net/1.0/people/?ws.op=getByEmail&email=%s' LP_URI = 'https://api.launchpad.net/1.0/people/?ws.op=getByEmail&email=%s'
def _make_users(users):
users_index = {}
for user in users:
if 'user_id' in user:
users_index[user['user_id']] = user
if 'launchpad_id' in user:
users_index[user['launchpad_id']] = user
for email in user['emails']:
users_index[email] = user
return users_index
def _make_companies(companies):
domains_index = {}
for company in companies:
for domain in company['domains']:
domains_index[domain] = company['company_name']
return domains_index
class TestRecordProcessor(testtools.TestCase): class TestRecordProcessor(testtools.TestCase):
def setUp(self): def setUp(self):
super(TestRecordProcessor, self).setUp() super(TestRecordProcessor, self).setUp()
@@ -77,9 +96,9 @@ class TestRecordProcessor(testtools.TestCase):
def get_by_key(table): def get_by_key(table):
if table == 'companies': if table == 'companies':
return default_data_processor._process_companies(companies) return _make_companies(companies)
elif table == 'users': elif table == 'users':
return default_data_processor._process_users(self.get_users()) return _make_users(self.get_users())
elif table == 'releases': elif table == 'releases':
return releases return releases
else: else:
@@ -160,8 +179,7 @@ class TestRecordProcessor(testtools.TestCase):
commit = list(self.commit_processor.process(commit_generator))[0] commit = list(self.commit_processor.process(commit_generator))[0]
self.runtime_storage.set_by_key.assert_called_once_with('users', self.runtime_storage.set_by_key.assert_called_with('users', mock.ANY)
mock.ANY)
self.read_json.assert_called_once_with(LP_URI % email) self.read_json.assert_called_once_with(LP_URI % email)
self.assertIn(email, user['emails']) self.assertIn(email, user['emails'])
self.assertEquals('NEC', commit['company_name']) self.assertEquals('NEC', commit['company_name'])
@@ -183,8 +201,7 @@ class TestRecordProcessor(testtools.TestCase):
commit = list(self.commit_processor.process(commit_generator))[0] commit = list(self.commit_processor.process(commit_generator))[0]
self.runtime_storage.set_by_key.assert_called_once_with('users', self.runtime_storage.set_by_key.assert_called_with('users', mock.ANY)
mock.ANY)
self.read_json.assert_called_once_with(LP_URI % email) self.read_json.assert_called_once_with(LP_URI % email)
self.assertIn(email, user['emails']) self.assertIn(email, user['emails'])
self.assertEquals('SuperCompany', commit['company_name']) self.assertEquals('SuperCompany', commit['company_name'])