diff --git a/stackalytics/processor/default_data_processor.py b/stackalytics/processor/default_data_processor.py index f6f8b960f..cc66ba5ea 100644 --- a/stackalytics/processor/default_data_processor.py +++ b/stackalytics/processor/default_data_processor.py @@ -124,7 +124,7 @@ def _update_with_driverlog_data(default_data, driverlog_data_uri): cis[module][driver['ci']['id']] = driver default_data['users'].append({ - 'launchpad_id': driver['ci']['id'], + 'gerrit_id': driver['ci']['id'], 'user_name': driver['ci']['id'], 'companies': [ {'company_name': driver['vendor'], 'end_date': None}], diff --git a/stackalytics/processor/record_processor.py b/stackalytics/processor/record_processor.py index f55e3b587..6777f6f09 100644 --- a/stackalytics/processor/record_processor.py +++ b/stackalytics/processor/record_processor.py @@ -95,12 +95,12 @@ class RecordProcessor(object): return self.domains_index[m] return None - def _create_user(self, launchpad_id, email, user_name): + def _create_user(self, launchpad_id, email, gerrit_id, user_name): company = (self._get_company_by_email(email) or self._get_independent()) user = { 'user_id': user_processor.make_user_id( - email=email, launchpad_id=launchpad_id), + email=email, launchpad_id=launchpad_id, gerrit_id=gerrit_id), 'launchpad_id': launchpad_id, 'user_name': user_name or '', 'companies': [{ @@ -108,6 +108,8 @@ class RecordProcessor(object): 'end_date': 0, }], } + if gerrit_id: + user['gerrit_id'] = gerrit_id if email: user['emails'] = [email] else: @@ -164,17 +166,16 @@ class RecordProcessor(object): merged_user = {} # merged user profile # collect ordinary fields - for key in ['seq', 'user_name', 'user_id', + for key in ['seq', 'user_name', 'user_id', 'gerrit_id', 'launchpad_id', 'companies', 'static']: - merged_user[key] = next((v.get(key) for v in user_profiles - if v.get(key)), None) - - if not merged_user['static']: - del merged_user['static'] + value = next((v.get(key) for v in user_profiles if v.get(key)), + None) + if value: + merged_user[key] = value # update user_id, prefer it to be equal to launchpad_id - merged_user['user_id'] = (merged_user['launchpad_id'] or - merged_user['user_id']) + merged_user['user_id'] = (merged_user.get('launchpad_id') or + merged_user.get('user_id')) # merge emails emails = set([]) @@ -183,7 +184,8 @@ class RecordProcessor(object): emails |= set(u.get('emails', [])) core_in |= set(u.get('core', [])) merged_user['emails'] = list(emails) - merged_user['core'] = list(core_in) + if core_in: + merged_user['core'] = list(core_in) # merge companies merged_companies = merged_user['companies'] @@ -223,17 +225,32 @@ class RecordProcessor(object): if lp_user_name: user_name = lp_user_name + gerrit_id = record.get('gerrit_id') + if gerrit_id: + user_g = user_processor.load_user( + self.runtime_storage_inst, gerrit_id=gerrit_id) or {} + if (not user_g) and (not launchpad_id): + # query LP + guessed_lp_id = gerrit_id + lp_user_name = self._get_lp_user_name(guessed_lp_id) + if lp_user_name == user_name: + launchpad_id = guessed_lp_id + else: + user_g = {} + user_l = user_processor.load_user( self.runtime_storage_inst, launchpad_id=launchpad_id) or {} - user = self._create_user(launchpad_id, email, user_name) + user = self._create_user(launchpad_id, email, gerrit_id, user_name) - if (user_e.get('seq') == user_l.get('seq')) and user_e.get('seq'): + if ((user_e.get('seq') == user_l.get('seq') == user_g.get('seq')) and + user_e.get('seq')): # sequence numbers are set and the same, merge is not needed user = user_e else: - if user_e or user_l: - user = self._merge_user_profiles([user_e, user_l, user]) + if user_e or user_l or user_g: + user = self._merge_user_profiles( + [user_e, user_l, user_g, user]) else: # create new if not user_name: @@ -243,6 +260,7 @@ class RecordProcessor(object): LOG.debug('Created new user: %s', user) user_processor.store_user(self.runtime_storage_inst, user) + LOG.debug('Stored user: %s', user) return user @@ -250,8 +268,8 @@ class RecordProcessor(object): user = self.update_user(record) record['user_id'] = user['user_id'] - record['launchpad_id'] = user['launchpad_id'] - + if user.get('launchpad_id'): + record['launchpad_id'] = user['launchpad_id'] if user.get('user_name'): record['author_name'] = user['user_name'] @@ -299,7 +317,7 @@ class RecordProcessor(object): owner = record['owner'] review['primary_key'] = review['id'] - review['launchpad_id'] = owner['username'] + review['gerrit_id'] = owner['username'].lower() review['author_name'] = owner['name'] review['author_email'] = owner['email'].lower() review['date'] = record['createdOn'] @@ -329,7 +347,7 @@ class RecordProcessor(object): patch_record['number'] = patch['number'] patch_record['date'] = patch['createdOn'] uploader = patch['uploader'] - patch_record['launchpad_id'] = uploader['username'] + patch_record['gerrit_id'] = uploader['username'].lower() patch_record['author_name'] = uploader['name'] patch_record['author_email'] = uploader['email'].lower() patch_record['module'] = review['module'] @@ -349,7 +367,7 @@ class RecordProcessor(object): mark['value'] = int(approval['value']) mark['date'] = approval['grantedOn'] mark['primary_key'] = (review['id'] + str(mark['date']) + mark['type']) - mark['launchpad_id'] = reviewer['username'] + mark['gerrit_id'] = reviewer['username'].lower() mark['author_name'] = reviewer['name'] mark['author_email'] = reviewer['email'].lower() mark['module'] = review['module'] @@ -663,18 +681,15 @@ class RecordProcessor(object): def _update_reviews_with_sequence_number(self): LOG.debug('Set review number in review records') - users_reviews = {} + users_reviews = collections.defaultdict(list) for record in self.runtime_storage_inst.get_all_records(): if record['record_type'] == 'review': - launchpad_id = record['launchpad_id'] + user_id = record.get('user_id') review = {'date': record['date'], 'id': record['id']} - if launchpad_id in users_reviews: - users_reviews[launchpad_id].append(review) - else: - users_reviews[launchpad_id] = [review] + users_reviews[user_id].append(review) reviews_index = {} - for launchpad_id, reviews in six.iteritems(users_reviews): + for reviews in six.itervalues(users_reviews): reviews.sort(key=lambda x: x['date']) review_number = 0 for review in reviews: @@ -692,7 +707,7 @@ class RecordProcessor(object): def _determine_core_contributors(self): LOG.debug('Determine core contributors') - core_engineers = {} + module_branches = collections.defaultdict(set) quarter_ago = int(time.time()) - 60 * 60 * 24 * 30 * 3 # a quarter ago for record in self.runtime_storage_inst.get_all_records(): @@ -701,14 +716,17 @@ class RecordProcessor(object): record['value'] in [2, -2]): module_branch = (record['module'], record['branch']) user_id = record['user_id'] - if user_id in core_engineers: - core_engineers[user_id].add(module_branch) - else: - core_engineers[user_id] = set([module_branch]) + module_branches[user_id].add(module_branch) + for user in self.runtime_storage_inst.get_all_users(): core_old = user.get('core') - user['core'] = list(core_engineers.get(user['user_id'], [])) - if user['core'] != core_old: + user_module_branch = module_branches.get(user['user_id']) + if user_module_branch: + user['core'] = list(user_module_branch) + elif user.get('core'): + del user['core'] + + if user.get('core') != core_old: user_processor.store_user(self.runtime_storage_inst, user) def _close_patch(self, cores, marks): @@ -737,7 +755,7 @@ class RecordProcessor(object): cores = set() for user in self.runtime_storage_inst.get_all_users(): - for (module, branch) in (user['core'] or []): + for (module, branch) in (user.get('core') or []): cores.add((module, branch, user['user_id'])) # map from review_id to current patch and list of marks diff --git a/stackalytics/processor/user_processor.py b/stackalytics/processor/user_processor.py index 7ee07edb0..97a4b66d0 100644 --- a/stackalytics/processor/user_processor.py +++ b/stackalytics/processor/user_processor.py @@ -14,11 +14,14 @@ # limitations under the License. -def make_user_id(email=None, launchpad_id=None, member_id=None): +def make_user_id(email=None, launchpad_id=None, gerrit_id=None, + member_id=None): + if launchpad_id or email: + return launchpad_id or email + if gerrit_id: + return 'gerrit:%s' % gerrit_id if member_id: return 'member:%s' % member_id - else: - return launchpad_id or email def store_user(runtime_storage_inst, user): @@ -29,13 +32,18 @@ def store_user(runtime_storage_inst, user): runtime_storage_inst.set_by_key('user:%s' % user['user_id'], user) if user.get('launchpad_id'): runtime_storage_inst.set_by_key('user:%s' % user['launchpad_id'], user) + if user.get('gerrit_id'): + runtime_storage_inst.set_by_key('user:gerrit:%s' % user['gerrit_id'], + user) for email in user.get('emails') or []: runtime_storage_inst.set_by_key('user:%s' % email, user) def load_user(runtime_storage_inst, seq=None, user_id=None, email=None, - launchpad_id=None, member_id=None): - if member_id: + launchpad_id=None, gerrit_id=None, member_id=None): + if gerrit_id: + key = 'gerrit:%s' % gerrit_id + elif member_id: key = 'member:%s' % member_id else: key = seq or user_id or launchpad_id or email diff --git a/tests/unit/test_record_processor.py b/tests/unit/test_record_processor.py index c5d0cf11d..79cbfd776 100644 --- a/tests/unit/test_record_processor.py +++ b/tests/unit/test_record_processor.py @@ -580,9 +580,9 @@ class TestRecordProcessor(testtools.TestCase): processed_records[1]) user = {'seq': 1, - 'core': [], 'user_id': 'john_doe', 'launchpad_id': 'john_doe', + 'gerrit_id': 'john_doe', 'user_name': 'John Doe', 'emails': ['john_doe@gmail.com'], 'companies': [{'company_name': '*independent', 'end_date': 0}]} @@ -632,7 +632,6 @@ class TestRecordProcessor(testtools.TestCase): processed_records[1]) user = {'seq': 1, - 'core': [], 'user_id': 'john_doe', 'launchpad_id': 'john_doe', 'user_name': 'John Doe', @@ -682,9 +681,9 @@ class TestRecordProcessor(testtools.TestCase): processed_records[1]) user = {'seq': 1, - 'core': [], 'user_id': 'john_doe', 'launchpad_id': 'john_doe', + 'gerrit_id': 'john_doe', 'user_name': 'John Doe', 'emails': ['john_doe@gmail.com'], 'companies': [{'company_name': '*independent', 'end_date': 0}]} @@ -750,8 +749,8 @@ class TestRecordProcessor(testtools.TestCase): [{'company_name': 'Rackspace', 'end_date': 0}]) def test_process_email_then_review(self): - # it is expected that the user profile will contain both email and - # LP id + # it is expected that the user profile will contain email and + # gerrit id, while LP id will be None record_processor_inst = self.make_record_processor() list(record_processor_inst.process([ @@ -772,9 +771,47 @@ class TestRecordProcessor(testtools.TestCase): ])) user = {'seq': 1, - 'core': [], + 'user_id': 'john_doe@gmail.com', + 'gerrit_id': 'john_doe', + 'user_name': 'John Doe', + 'emails': ['john_doe@gmail.com'], + 'companies': [{'company_name': '*independent', 'end_date': 0}]} + self.assertEqual(user, user_processor.load_user( + record_processor_inst.runtime_storage_inst, + email='john_doe@gmail.com')) + self.assertEqual(user, user_processor.load_user( + record_processor_inst.runtime_storage_inst, + gerrit_id='john_doe')) + + def test_process_email_then_review_gerrit_id_same_as_launchpad_id(self): + # it is expected that the user profile will contain email, LP id and + # gerrit id + record_processor_inst = self.make_record_processor( + lp_user_name={'john_doe': {'name': 'john_doe', + 'display_name': 'John Doe'}} + ) + + list(record_processor_inst.process([ + {'record_type': 'email', + 'message_id': '', + 'author_email': 'john_doe@gmail.com', + 'subject': 'hello, world!', + 'body': 'lorem ipsum', + 'date': 1234567890}, + {'record_type': 'review', + 'id': 'I1045730e47e9e6ad31fcdfbaefdad77e2f3b2c3e', + 'subject': 'Fix AttributeError in Keypair._add_details()', + 'owner': {'name': 'John Doe', + 'email': 'john_doe@gmail.com', + 'username': 'john_doe'}, + 'createdOn': 1379404951, + 'module': 'nova', 'branch': 'master'} + ])) + + user = {'seq': 1, 'user_id': 'john_doe', 'launchpad_id': 'john_doe', + 'gerrit_id': 'john_doe', 'user_name': 'John Doe', 'emails': ['john_doe@gmail.com'], 'companies': [{'company_name': '*independent', 'end_date': 0}]} @@ -784,11 +821,16 @@ class TestRecordProcessor(testtools.TestCase): self.assertEqual(user, user_processor.load_user( record_processor_inst.runtime_storage_inst, user_id='john_doe')) + self.assertEqual(user, user_processor.load_user( + record_processor_inst.runtime_storage_inst, + gerrit_id='john_doe')) def test_process_commit_then_review_with_different_email(self): record_processor_inst = self.make_record_processor( lp_info={'john_doe@gmail.com': {'name': 'john_doe', 'display_name': 'John Doe'}}, + lp_user_name={'john_doe': {'name': 'john_doe', + 'display_name': 'John Doe'}}, companies=[{'company_name': 'IBM', 'domains': ['ibm.com']}]) list(record_processor_inst.process([ @@ -817,18 +859,20 @@ class TestRecordProcessor(testtools.TestCase): 'username': 'john_doe'}}]}]} ])) user = {'seq': 1, - 'core': [], 'user_id': 'john_doe', 'launchpad_id': 'john_doe', 'user_name': 'John Doe', 'emails': ['john_doe@ibm.com', 'john_doe@gmail.com'], 'companies': [{'company_name': 'IBM', 'end_date': 0}]} self.assertUsersMatch(user, user_processor.load_user( - record_processor_inst.runtime_storage_inst, 'john_doe')) + record_processor_inst.runtime_storage_inst, + user_id='john_doe')) self.assertUsersMatch(user, user_processor.load_user( - record_processor_inst.runtime_storage_inst, 'john_doe@gmail.com')) + record_processor_inst.runtime_storage_inst, + email='john_doe@gmail.com')) self.assertUsersMatch(user, user_processor.load_user( - record_processor_inst.runtime_storage_inst, 'john_doe@ibm.com')) + record_processor_inst.runtime_storage_inst, + email='john_doe@ibm.com')) def test_merge_users(self): record_processor_inst = self.make_record_processor( @@ -864,9 +908,9 @@ class TestRecordProcessor(testtools.TestCase): record_processor_inst.post_processing({}) user = {'seq': 2, - 'core': [], 'user_id': 'john_doe', 'launchpad_id': 'john_doe', + 'gerrit_id': 'john_doe', 'user_name': 'John Doe', 'emails': ['john_doe@ibm.com'], 'companies': [{'company_name': 'IBM', 'end_date': 0}]} @@ -877,9 +921,11 @@ class TestRecordProcessor(testtools.TestCase): self.assertEqual(user, user_processor.load_user( runtime_storage_inst, 2)) self.assertEqual(user, user_processor.load_user( - runtime_storage_inst, 'john_doe')) + runtime_storage_inst, user_id='john_doe')) self.assertEqual(user, user_processor.load_user( - runtime_storage_inst, 'john_doe@ibm.com')) + runtime_storage_inst, email='john_doe@ibm.com')) + self.assertEqual(user, user_processor.load_user( + runtime_storage_inst, gerrit_id='john_doe')) # all records should have the same user_id and company name for record in runtime_storage_inst.get_all_records(): @@ -891,7 +937,8 @@ class TestRecordProcessor(testtools.TestCase): def test_core_user_guess(self): record_processor_inst = self.make_record_processor( lp_user_name={ - 'john_doe': {'name': 'john_doe', 'display_name': 'John Doe'} + 'john_doe': {'name': 'john_doe', 'display_name': 'John Doe'}, + 'homer': {'name': 'homer', 'display_name': 'Homer Simpson'}, }, companies=[{'company_name': 'IBM', 'domains': ['ibm.com']}], ) @@ -944,7 +991,6 @@ class TestRecordProcessor(testtools.TestCase): user_2 = {'seq': 3, 'user_id': 'homer', 'launchpad_id': 'homer', 'user_name': 'Homer Simpson', 'emails': ['hsimpson@gmail.com'], - 'core': [], 'companies': [{'company_name': '*independent', 'end_date': 0}]} runtime_storage_inst = record_processor_inst.runtime_storage_inst @@ -1440,16 +1486,17 @@ class TestRecordProcessor(testtools.TestCase): def assertRecordsMatch(self, expected, actual): for key, value in six.iteritems(expected): - self.assertEqual(value, actual[key], + self.assertEqual(value, actual.get(key), 'Values for key %s do not match' % key) def assertUsersMatch(self, expected, actual): + self.assertIsNotNone(actual, 'User should not be None') match = True for key, value in six.iteritems(expected): if key == 'emails': - match = (set(value) == set(actual[key])) + match = (set(value) == set(actual.get(key))) else: - match = (value == actual[key]) + match = (value == actual.get(key)) self.assertTrue(match, 'User %s should match %s' % (actual, expected))