diff --git a/stackalytics/processor/record_processor.py b/stackalytics/processor/record_processor.py index 811615223..6cafbebae 100644 --- a/stackalytics/processor/record_processor.py +++ b/stackalytics/processor/record_processor.py @@ -29,8 +29,6 @@ class RecordProcessor(object): self.domains_index = runtime_storage_inst.get_by_key('companies') - self.users_index = runtime_storage_inst.get_by_key('users') - self.releases = runtime_storage_inst.get_by_key('releases') self.releases_dates = [r['end_date'] for r in self.releases] @@ -142,18 +140,19 @@ class RecordProcessor(object): def update_user(self, record): email = record.get('author_email') - if email in self.users_index: - user = self.users_index[email] - else: + user = utils.load_user(self.runtime_storage_inst, email) + if not user: if record.get('launchpad_id'): launchpad_id = record.get('launchpad_id') user_name = record.get('author_name') else: launchpad_id, user_name = self._get_lp_info(email) - if (launchpad_id) and (launchpad_id in self.users_index): + if launchpad_id: + user = utils.load_user(self.runtime_storage_inst, launchpad_id) + + if user: # merge emails - user = self.users_index[launchpad_id] if email: self._update_user_profile(user, email) else: @@ -165,10 +164,6 @@ class RecordProcessor(object): user = self._create_user(launchpad_id, email, user_name) utils.store_user(self.runtime_storage_inst, user) - if email: - self.users_index[email] = user - if user['launchpad_id']: - self.users_index[user['launchpad_id']] = user return user @@ -198,7 +193,7 @@ class RecordProcessor(object): yield record def _spawn_review(self, record): - # copy everything except pathsets and flatten user data + # copy everything except patchsets and flatten user data review = dict([(k, v) for k, v in record.iteritems() if k not in ['patchSets', 'owner', 'createdOn']]) owner = record['owner'] @@ -340,8 +335,6 @@ class RecordProcessor(object): yield r - self.runtime_storage_inst.set_by_key('users', self.users_index) - def update(self, record_iterator, release_index): for record in record_iterator: need_update = False @@ -369,8 +362,6 @@ class RecordProcessor(object): if need_update: yield record - self.runtime_storage_inst.set_by_key('users', self.users_index) - def _get_records_for_users_to_update(self): users_reviews = {} valid_blueprints = {} @@ -424,7 +415,7 @@ class RecordProcessor(object): user_id = record['user_id'] if user_id in self.updated_users: - user = self.users_index[user_id] + user = utils.load_user(self.runtime_storage_inst, user_id) user_company_name = user['companies'][0]['company_name'] if record['company_name'] != user_company_name: LOG.debug('Update record %s: company changed to: %s', diff --git a/stackalytics/processor/runtime_storage.py b/stackalytics/processor/runtime_storage.py index f714ba3eb..e3d38218f 100644 --- a/stackalytics/processor/runtime_storage.py +++ b/stackalytics/processor/runtime_storage.py @@ -42,7 +42,7 @@ class RuntimeStorage(object): def get_by_key(self, key): pass - def set_by_key(self, key, head_commit_id): + def set_by_key(self, key, value): pass def get_update(self, pid): diff --git a/stackalytics/processor/utils.py b/stackalytics/processor/utils.py index 6cd125171..d2e166838 100644 --- a/stackalytics/processor/utils.py +++ b/stackalytics/processor/utils.py @@ -82,6 +82,10 @@ def make_range(start, stop, step): def store_user(runtime_storage_inst, user): runtime_storage_inst.set_by_key('user:%s' % user['user_id'], user) + if user.get('launchpad_id'): + runtime_storage_inst.set_by_key('user:%s' % user['launchpad_id'], user) + for email in user.get('emails') or []: + runtime_storage_inst.set_by_key('user:%s' % email, user) def load_user(runtime_storage_inst, user_id): diff --git a/tests/unit/test_record_processor.py b/tests/unit/test_record_processor.py index 8296884b7..6f4369cfd 100644 --- a/tests/unit/test_record_processor.py +++ b/tests/unit/test_record_processor.py @@ -202,8 +202,8 @@ class TestRecordProcessor(testtools.TestCase): } self.assertRecordsMatch(expected_commit, processed_commit) - self.assertIn('johndoe@ibm.com', - record_processor_inst.users_index['john_doe']['emails']) + self.assertIn('johndoe@ibm.com', utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe')['emails']) def test_process_commit_existing_user_new_email_unknown_company(self): # User is known to LP, but his email is new to us. Should match @@ -232,8 +232,8 @@ class TestRecordProcessor(testtools.TestCase): } self.assertRecordsMatch(expected_commit, processed_commit) - self.assertIn('johndoe@gmail.com', - record_processor_inst.users_index['john_doe']['emails']) + self.assertIn('johndoe@gmail.com', utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe')['emails']) def test_process_commit_existing_user_new_email_known_company_update(self): record_processor_inst = self.make_record_processor( @@ -261,7 +261,8 @@ class TestRecordProcessor(testtools.TestCase): } self.assertRecordsMatch(expected_commit, processed_commit) - user = record_processor_inst.users_index['john_doe'] + user = utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe') self.assertIn('johndoe@gmail.com', user['emails']) self.assertEquals('IBM', user['companies'][0]['company_name'], message='User affiliation should be updated') @@ -286,7 +287,8 @@ class TestRecordProcessor(testtools.TestCase): } self.assertRecordsMatch(expected_commit, processed_commit) - user = record_processor_inst.users_index['john_doe'] + user = utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe') self.assertIn('johndoe@ibm.com', user['emails']) self.assertEquals('IBM', user['companies'][0]['company_name']) @@ -308,8 +310,8 @@ class TestRecordProcessor(testtools.TestCase): } self.assertRecordsMatch(expected_commit, processed_commit) - self.assertEquals(1, len(record_processor_inst.users_index)) - user = record_processor_inst.users_index['johndoe@ibm.com'] + user = utils.load_user( + record_processor_inst.runtime_storage_inst, 'johndoe@ibm.com') self.assertIn('johndoe@ibm.com', user['emails']) self.assertEquals('IBM', user['companies'][0]['company_name']) self.assertEquals(None, user['launchpad_id']) @@ -338,8 +340,8 @@ class TestRecordProcessor(testtools.TestCase): 'company_name': '*independent'}, processed_records[0]) - self.assertEquals(1, len(record_processor_inst.users_index)) - user = record_processor_inst.users_index['john_doe'] + user = utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe') self.assertEquals({ 'user_id': 'john_doe', 'launchpad_id': 'john_doe', @@ -372,8 +374,8 @@ class TestRecordProcessor(testtools.TestCase): 'company_name': '*independent'}, processed_records[0]) - self.assertEquals(1, len(record_processor_inst.users_index)) - user = record_processor_inst.users_index['john_doe'] + user = utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe') self.assertEquals({ 'user_id': 'john_doe', 'launchpad_id': 'john_doe', @@ -423,8 +425,10 @@ class TestRecordProcessor(testtools.TestCase): 'user_name': 'John Doe', 'emails': ['john_doe@gmail.com'], 'companies': [{'company_name': '*independent', 'end_date': 0}]} - self.assertEquals({'john_doe': user, 'john_doe@gmail.com': user}, - record_processor_inst.users_index) + self.assertEquals(user, utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe')) + self.assertEquals(user, utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe@gmail.com')) def test_process_blueprint_then_commit(self): record_processor_inst = self.make_record_processor( @@ -469,8 +473,10 @@ class TestRecordProcessor(testtools.TestCase): 'user_name': 'John Doe', 'emails': ['john_doe@gmail.com'], 'companies': [{'company_name': '*independent', 'end_date': 0}]} - self.assertEquals({'john_doe': user, 'john_doe@gmail.com': user}, - record_processor_inst.users_index) + self.assertEquals(user, utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe')) + self.assertEquals(user, utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe@gmail.com')) def test_process_review_then_blueprint(self): record_processor_inst = self.make_record_processor( @@ -513,8 +519,10 @@ class TestRecordProcessor(testtools.TestCase): 'user_name': 'John Doe', 'emails': ['john_doe@gmail.com'], 'companies': [{'company_name': '*independent', 'end_date': 0}]} - self.assertEquals({'john_doe': user, 'john_doe@gmail.com': user}, - record_processor_inst.users_index) + self.assertEquals(user, utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe')) + self.assertEquals(user, utils.load_user( + record_processor_inst.runtime_storage_inst, 'john_doe@gmail.com')) # update records @@ -788,22 +796,37 @@ def generate_emails(author_name='John Doe', author_email='johndoe@gmail.com', def make_runtime_storage(users=None, companies=None, releases=None, repos=None): - def get_by_key(collection): - if collection == 'companies': + runtime_storage_cache = {} + + def get_by_key(key): + if key == 'companies': return _make_companies(companies or [ {"company_name": "*independent", "domains": [""]}, ]) - elif collection == 'users': + elif key == 'users': return _make_users(users or []) - elif collection == 'releases': + elif key == 'releases': return releases or RELEASES - elif collection == 'repos': + elif key == 'repos': return repos or REPOS else: - raise Exception('Wrong collection: %s' % collection) + return runtime_storage_cache.get(key) + + def set_by_key(key, value): + runtime_storage_cache[key] = value rs = mock.Mock(runtime_storage.RuntimeStorage) rs.get_by_key = mock.Mock(side_effect=get_by_key) + rs.set_by_key = mock.Mock(side_effect=set_by_key) + + if users: + for user in users: + set_by_key('user:%s' % user['user_id'], user) + if user.get('launchpad_id'): + set_by_key('user:%s' % user['launchpad_id'], user) + for email in user.get('emails') or []: + set_by_key('user:%s' % email, user) + return rs