Move user store operations into separate module

Change-Id: I7f81f48ff6415f15125ad58ef15a1f7ed7f91678
This commit is contained in:
Ilya Shakhat 2014-10-06 13:07:38 +04:00
parent 8ab9919af4
commit 588303ab28
7 changed files with 142 additions and 87 deletions

View File

@ -24,6 +24,7 @@ import six
from stackalytics.dashboard import memory_storage
from stackalytics.openstack.common import log as logging
from stackalytics.processor import runtime_storage
from stackalytics.processor import user_processor
from stackalytics.processor import utils
@ -197,7 +198,8 @@ def get_user_from_runtime_storage(user_id):
runtime_storage_inst = get_vault()['runtime_storage']
user_index = get_vault()['user_index']
if user_id not in user_index:
user_index[user_id] = utils.load_user(runtime_storage_inst, user_id)
user_index[user_id] = user_processor.load_user(
runtime_storage_inst, user_id=user_id)
return user_index[user_id]

View File

@ -21,6 +21,7 @@ import six
from stackalytics.openstack.common import log as logging
from stackalytics.processor import normalizer
from stackalytics.processor import user_processor
from stackalytics.processor import utils
LOG = logging.getLogger(__name__)
@ -136,12 +137,13 @@ def _update_with_driverlog_data(default_data, driverlog_data_uri):
def _store_users(runtime_storage_inst, users):
for user in users:
stored_user = utils.load_user(runtime_storage_inst, user['user_id'])
stored_user = user_processor.load_user(runtime_storage_inst,
user_id=user['user_id'])
if stored_user:
stored_user.update(user)
user = stored_user
user['static'] = True
utils.store_user(runtime_storage_inst, user)
user_processor.store_user(runtime_storage_inst, user)
def _store_companies(runtime_storage_inst, companies):

View File

@ -16,6 +16,7 @@
import six
from stackalytics.openstack.common import log as logging
from stackalytics.processor import user_processor
from stackalytics.processor import utils
@ -36,7 +37,9 @@ def _normalize_user(user):
return x["end_date"] - y["end_date"]
user['companies'].sort(key=utils.cmp_to_key(end_date_comparator))
user['user_id'] = user['launchpad_id']
user['user_id'] = user_processor.make_user_id(
launchpad_id=user.get('launchpad_id'),
email=user.get('email'))
def _normalize_users(users):

View File

@ -22,6 +22,7 @@ import six
from stackalytics.openstack.common import log as logging
from stackalytics.processor import launchpad_utils
from stackalytics.processor import user_processor
from stackalytics.processor import utils
@ -98,7 +99,8 @@ class RecordProcessor(object):
company = (self._get_company_by_email(email) or
self._get_independent())
user = {
'user_id': launchpad_id or email,
'user_id': user_processor.make_user_id(
email=email, launchpad_id=launchpad_id),
'launchpad_id': launchpad_id,
'user_name': user_name or '',
'companies': [{
@ -204,12 +206,14 @@ class RecordProcessor(object):
for u in user_profiles:
if u.get('seq') in seqs:
LOG.debug('Delete user: %s', u)
utils.delete_user(self.runtime_storage_inst, u)
user_processor.delete_user(
self.runtime_storage_inst, u)
return merged_user
def update_user(self, record):
email = record.get('author_email')
user_e = utils.load_user(self.runtime_storage_inst, email) or {}
user_e = user_processor.load_user(
self.runtime_storage_inst, email=email) or {}
user_name = record.get('author_name')
launchpad_id = record.get('launchpad_id')
@ -219,7 +223,8 @@ class RecordProcessor(object):
if lp_user_name:
user_name = lp_user_name
user_l = utils.load_user(self.runtime_storage_inst, launchpad_id) or {}
user_l = user_processor.load_user(
self.runtime_storage_inst, launchpad_id=launchpad_id) or {}
user = self._create_user(launchpad_id, email, user_name)
@ -237,7 +242,7 @@ class RecordProcessor(object):
user['user_name'] = user_name
LOG.debug('Created new user: %s', user)
utils.store_user(self.runtime_storage_inst, user)
user_processor.store_user(self.runtime_storage_inst, user)
return user
@ -473,7 +478,7 @@ class RecordProcessor(object):
yield bug_fixed
def _process_member(self, record):
user_id = "member:" + record['member_id']
user_id = user_processor.make_user_id(member_id=record['member_id'])
record['primary_key'] = user_id
record['date'] = utils.member_date_to_timestamp(record['date_joined'])
record['author_name'] = record['member_name']
@ -489,7 +494,7 @@ class RecordProcessor(object):
# _update_record_and_user function will create new user if needed
self._update_record_and_user(record)
record['company_name'] = company_name
user = utils.load_user(self.runtime_storage_inst, user_id)
user = user_processor.load_user(self.runtime_storage_inst, user_id)
user['user_name'] = record['author_name']
user['companies'] = [{
@ -498,7 +503,7 @@ class RecordProcessor(object):
}]
user['company_name'] = company_name
utils.store_user(self.runtime_storage_inst, user)
user_processor.store_user(self.runtime_storage_inst, user)
record['company_name'] = company_name
@ -704,7 +709,7 @@ class RecordProcessor(object):
core_old = user.get('core')
user['core'] = list(core_engineers.get(user['user_id'], []))
if user['core'] != core_old:
utils.store_user(self.runtime_storage_inst, user)
user_processor.store_user(self.runtime_storage_inst, user)
def _close_patch(self, cores, marks):
if len(marks) < 2:
@ -784,15 +789,15 @@ class RecordProcessor(object):
yield record
user = utils.load_user(self.runtime_storage_inst,
record['user_id'])
user = user_processor.load_user(self.runtime_storage_inst,
record['user_id'])
LOG.debug('Update user %s, company name changed to %s',
user, company_name)
user['companies'] = [{
'company_name': company_name,
'end_date': 0,
}]
utils.store_user(self.runtime_storage_inst, user)
user_processor.store_user(self.runtime_storage_inst, user)
def post_processing(self, release_index):
self.runtime_storage_inst.set_records(

View File

@ -0,0 +1,48 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
def make_user_id(email=None, launchpad_id=None, member_id=None):
if member_id:
return 'member:%s' % member_id
else:
return launchpad_id or email
def store_user(runtime_storage_inst, user):
if not user.get('seq'):
user['seq'] = runtime_storage_inst.inc_user_count()
runtime_storage_inst.set_by_key('user:%s' % user['seq'], user)
if user.get('user_id'):
runtime_storage_inst.set_by_key('user:%s' % user['user_id'], user)
if user.get('launchpad_id'):
runtime_storage_inst.set_by_key('user:%s' % user['launchpad_id'], user)
for email in user.get('emails') or []:
runtime_storage_inst.set_by_key('user:%s' % email, user)
def load_user(runtime_storage_inst, seq=None, user_id=None, email=None,
launchpad_id=None, member_id=None):
if member_id:
key = 'member:%s' % member_id
else:
key = seq or user_id or launchpad_id or email
if key:
return runtime_storage_inst.get_by_key('user:%s' % key)
return None
def delete_user(runtime_storage_inst, user):
runtime_storage_inst.delete_by_key('user:%s' % user['seq'])

View File

@ -163,28 +163,6 @@ def make_range(start, stop, step):
yield six.moves.range(last_full, stop)
def store_user(runtime_storage_inst, user):
if not user.get('seq'):
user['seq'] = runtime_storage_inst.inc_user_count()
runtime_storage_inst.set_by_key('user:%s' % user['seq'], user)
if user.get('user_id'):
runtime_storage_inst.set_by_key('user:%s' % user['user_id'], user)
if user.get('launchpad_id'):
runtime_storage_inst.set_by_key('user:%s' % user['launchpad_id'], user)
for email in user.get('emails') or []:
runtime_storage_inst.set_by_key('user:%s' % email, user)
def load_user(runtime_storage_inst, user_id):
if user_id:
return runtime_storage_inst.get_by_key('user:%s' % user_id)
return None
def delete_user(runtime_storage_inst, user):
runtime_storage_inst.delete_by_key('user:%s' % user['seq'])
def load_repos(runtime_storage_inst):
return runtime_storage_inst.get_by_key('repos') or []

View File

@ -21,6 +21,7 @@ import testtools
from stackalytics.processor import record_processor
from stackalytics.processor import runtime_storage
from stackalytics.processor import user_processor
from stackalytics.processor import utils
@ -205,8 +206,9 @@ class TestRecordProcessor(testtools.TestCase):
}
self.assertRecordsMatch(expected_commit, processed_commit)
self.assertIn('johndoe@ibm.com', utils.load_user(
record_processor_inst.runtime_storage_inst, 'john_doe')['emails'])
self.assertIn('johndoe@ibm.com', user_processor.load_user(
record_processor_inst.runtime_storage_inst,
user_id='john_doe')['emails'])
def test_process_commit_existing_user_new_email_known_company_static(self):
# User profile is configured in default_data. Email is new to us,
@ -237,8 +239,9 @@ class TestRecordProcessor(testtools.TestCase):
}
self.assertRecordsMatch(expected_commit, processed_commit)
self.assertIn('johndoe@ibm.com', utils.load_user(
record_processor_inst.runtime_storage_inst, 'john_doe')['emails'])
self.assertIn('johndoe@ibm.com', user_processor.load_user(
record_processor_inst.runtime_storage_inst,
user_id='john_doe')['emails'])
def test_process_commit_existing_user_old_job_not_overridden(self):
# User is known to LP, his email is new to us, and maps to other
@ -300,8 +303,9 @@ class TestRecordProcessor(testtools.TestCase):
}
self.assertRecordsMatch(expected_commit, processed_commit)
self.assertIn('johndoe@gmail.com', utils.load_user(
record_processor_inst.runtime_storage_inst, 'john_doe')['emails'])
self.assertIn('johndoe@gmail.com', user_processor.load_user(
record_processor_inst.runtime_storage_inst,
user_id='john_doe')['emails'])
def test_process_commit_existing_user_new_email_known_company_update(self):
record_processor_inst = self.make_record_processor(
@ -329,8 +333,8 @@ class TestRecordProcessor(testtools.TestCase):
}
self.assertRecordsMatch(expected_commit, processed_commit)
user = utils.load_user(
record_processor_inst.runtime_storage_inst, 'john_doe')
user = user_processor.load_user(
record_processor_inst.runtime_storage_inst, user_id='john_doe')
self.assertIn('johndoe@gmail.com', user['emails'])
self.assertEqual('IBM', user['companies'][0]['company_name'],
message='User affiliation should be updated')
@ -355,8 +359,8 @@ class TestRecordProcessor(testtools.TestCase):
}
self.assertRecordsMatch(expected_commit, processed_commit)
user = utils.load_user(
record_processor_inst.runtime_storage_inst, 'john_doe')
user = user_processor.load_user(
record_processor_inst.runtime_storage_inst, user_id='john_doe')
self.assertIn('johndoe@ibm.com', user['emails'])
self.assertEqual('IBM', user['companies'][0]['company_name'])
@ -378,8 +382,9 @@ class TestRecordProcessor(testtools.TestCase):
}
self.assertRecordsMatch(expected_commit, processed_commit)
user = utils.load_user(
record_processor_inst.runtime_storage_inst, 'johndoe@ibm.com')
user = user_processor.load_user(
record_processor_inst.runtime_storage_inst,
user_id='johndoe@ibm.com')
self.assertIn('johndoe@ibm.com', user['emails'])
self.assertEqual('IBM', user['companies'][0]['company_name'])
self.assertEqual(None, user['launchpad_id'])
@ -492,8 +497,8 @@ class TestRecordProcessor(testtools.TestCase):
'company_name': '*independent'},
processed_records[0])
user = utils.load_user(
record_processor_inst.runtime_storage_inst, 'john_doe')
user = user_processor.load_user(
record_processor_inst.runtime_storage_inst, user_id='john_doe')
self.assertEqual({
'seq': 1,
'user_id': 'john_doe',
@ -527,8 +532,8 @@ class TestRecordProcessor(testtools.TestCase):
'company_name': '*independent'},
processed_records[0])
user = utils.load_user(
record_processor_inst.runtime_storage_inst, 'john_doe')
user = user_processor.load_user(
record_processor_inst.runtime_storage_inst, user_id='john_doe')
self.assertEqual({
'seq': 1,
'user_id': 'john_doe',
@ -581,10 +586,12 @@ class TestRecordProcessor(testtools.TestCase):
'user_name': 'John Doe',
'emails': ['john_doe@gmail.com'],
'companies': [{'company_name': '*independent', 'end_date': 0}]}
self.assertEqual(user, utils.load_user(
record_processor_inst.runtime_storage_inst, 'john_doe'))
self.assertEqual(user, utils.load_user(
record_processor_inst.runtime_storage_inst, 'john_doe@gmail.com'))
self.assertEqual(user, user_processor.load_user(
record_processor_inst.runtime_storage_inst,
user_id='john_doe'))
self.assertEqual(user, user_processor.load_user(
record_processor_inst.runtime_storage_inst,
email='john_doe@gmail.com'))
def test_process_blueprint_then_commit(self):
record_processor_inst = self.make_record_processor(
@ -631,10 +638,12 @@ class TestRecordProcessor(testtools.TestCase):
'user_name': 'John Doe',
'emails': ['john_doe@gmail.com'],
'companies': [{'company_name': '*independent', 'end_date': 0}]}
self.assertEqual(user, utils.load_user(
record_processor_inst.runtime_storage_inst, 'john_doe'))
self.assertEqual(user, utils.load_user(
record_processor_inst.runtime_storage_inst, 'john_doe@gmail.com'))
self.assertEqual(user, user_processor.load_user(
record_processor_inst.runtime_storage_inst,
user_id='john_doe'))
self.assertEqual(user, user_processor.load_user(
record_processor_inst.runtime_storage_inst,
email='john_doe@gmail.com'))
def test_process_review_then_blueprint(self):
record_processor_inst = self.make_record_processor(
@ -679,10 +688,12 @@ class TestRecordProcessor(testtools.TestCase):
'user_name': 'John Doe',
'emails': ['john_doe@gmail.com'],
'companies': [{'company_name': '*independent', 'end_date': 0}]}
self.assertEqual(user, utils.load_user(
record_processor_inst.runtime_storage_inst, 'john_doe'))
self.assertEqual(user, utils.load_user(
record_processor_inst.runtime_storage_inst, 'john_doe@gmail.com'))
self.assertEqual(user, user_processor.load_user(
record_processor_inst.runtime_storage_inst,
user_id='john_doe'))
self.assertEqual(user, user_processor.load_user(
record_processor_inst.runtime_storage_inst,
email='john_doe@gmail.com'))
def test_create_member(self):
member_record = {'member_id': '123456789',
@ -702,8 +713,9 @@ class TestRecordProcessor(testtools.TestCase):
self.assertEqual(result_member['author_name'], 'John Doe')
self.assertEqual(result_member['company_name'], 'Mirantis')
result_user = utils.load_user(
record_processor_inst.runtime_storage_inst, 'member:123456789')
result_user = user_processor.load_user(
record_processor_inst.runtime_storage_inst,
member_id='123456789')
self.assertEqual(result_user['user_name'], 'John Doe')
self.assertEqual(result_user['company_name'], 'Mirantis')
@ -729,8 +741,9 @@ class TestRecordProcessor(testtools.TestCase):
self.assertEqual(result_member['author_name'], 'Bill Smith')
self.assertEqual(result_member['company_name'], 'Rackspace')
result_user = utils.load_user(
record_processor_inst.runtime_storage_inst, 'member:123456789')
result_user = user_processor.load_user(
record_processor_inst.runtime_storage_inst,
member_id='123456789')
self.assertEqual(result_user['user_name'], 'Bill Smith')
self.assertEqual(result_user['companies'],
@ -765,10 +778,12 @@ class TestRecordProcessor(testtools.TestCase):
'user_name': 'John Doe',
'emails': ['john_doe@gmail.com'],
'companies': [{'company_name': '*independent', 'end_date': 0}]}
self.assertEqual(user, utils.load_user(
record_processor_inst.runtime_storage_inst, 'john_doe@gmail.com'))
self.assertEqual(user, utils.load_user(
record_processor_inst.runtime_storage_inst, 'john_doe'))
self.assertEqual(user, user_processor.load_user(
record_processor_inst.runtime_storage_inst,
email='john_doe@gmail.com'))
self.assertEqual(user, user_processor.load_user(
record_processor_inst.runtime_storage_inst,
user_id='john_doe'))
def test_process_commit_then_review_with_different_email(self):
record_processor_inst = self.make_record_processor(
@ -808,11 +823,11 @@ class TestRecordProcessor(testtools.TestCase):
'user_name': 'John Doe',
'emails': ['john_doe@ibm.com', 'john_doe@gmail.com'],
'companies': [{'company_name': 'IBM', 'end_date': 0}]}
self.assertUsersMatch(user, utils.load_user(
self.assertUsersMatch(user, user_processor.load_user(
record_processor_inst.runtime_storage_inst, 'john_doe'))
self.assertUsersMatch(user, utils.load_user(
self.assertUsersMatch(user, user_processor.load_user(
record_processor_inst.runtime_storage_inst, 'john_doe@gmail.com'))
self.assertUsersMatch(user, utils.load_user(
self.assertUsersMatch(user, user_processor.load_user(
record_processor_inst.runtime_storage_inst, 'john_doe@ibm.com'))
def test_merge_users(self):
@ -857,12 +872,14 @@ class TestRecordProcessor(testtools.TestCase):
'companies': [{'company_name': 'IBM', 'end_date': 0}]}
runtime_storage_inst = record_processor_inst.runtime_storage_inst
self.assertEqual(2, runtime_storage_inst.get_by_key('user:count'))
self.assertEqual(None, utils.load_user(runtime_storage_inst, 1))
self.assertEqual(user, utils.load_user(runtime_storage_inst, 2))
self.assertEqual(user, utils.load_user(runtime_storage_inst,
'john_doe'))
self.assertEqual(user, utils.load_user(runtime_storage_inst,
'john_doe@ibm.com'))
self.assertEqual(None, user_processor.load_user(
runtime_storage_inst, 1))
self.assertEqual(user, user_processor.load_user(
runtime_storage_inst, 2))
self.assertEqual(user, user_processor.load_user(
runtime_storage_inst, 'john_doe'))
self.assertEqual(user, user_processor.load_user(
runtime_storage_inst, 'john_doe@ibm.com'))
# all records should have the same user_id and company name
for record in runtime_storage_inst.get_all_records():
@ -931,10 +948,10 @@ class TestRecordProcessor(testtools.TestCase):
'companies': [{'company_name': '*independent',
'end_date': 0}]}
runtime_storage_inst = record_processor_inst.runtime_storage_inst
self.assertUsersMatch(user_1, utils.load_user(runtime_storage_inst,
'john_doe'))
self.assertUsersMatch(user_2, utils.load_user(runtime_storage_inst,
'homer'))
self.assertUsersMatch(user_1, user_processor.load_user(
runtime_storage_inst, user_id='john_doe'))
self.assertUsersMatch(user_2, user_processor.load_user(
runtime_storage_inst, user_id='homer'))
def test_process_commit_with_coauthors(self):
record_processor_inst = self.make_record_processor(