From 6c7f19e011948b7b6ff77a408af85132176478ab Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Thu, 24 Aug 2017 18:01:01 +0200 Subject: [PATCH] Refactor record processor Move user-related and Launchpad-related functions from record processor to corresponding modules. Change-Id: I9bd08fec4bc074373255118ab1be702113c56a6f --- stackalytics/processor/launchpad_utils.py | 43 +++- stackalytics/processor/record_processor.py | 191 +++--------------- stackalytics/processor/user_processor.py | 170 +++++++++++++++- .../tests/unit/test_launchpad_utils.py | 46 +++++ .../tests/unit/test_record_processor.py | 21 +- .../tests/unit/test_user_processor.py | 31 +++ 6 files changed, 313 insertions(+), 189 deletions(-) create mode 100644 stackalytics/tests/unit/test_launchpad_utils.py diff --git a/stackalytics/processor/launchpad_utils.py b/stackalytics/processor/launchpad_utils.py index 0c7710943..89b676899 100644 --- a/stackalytics/processor/launchpad_utils.py +++ b/stackalytics/processor/launchpad_utils.py @@ -37,7 +37,7 @@ def link_to_launchpad_id(link): return link[link.find('~') + 1:] -def lp_profile_by_launchpad_id(launchpad_id): +def _lp_profile_by_launchpad_id(launchpad_id): LOG.debug('Lookup user id %s at Launchpad', launchpad_id) uri = LP_URI_V1 % ('~' + launchpad_id) lp_profile = utils.read_json_from_uri(uri, session=launchpad_session) @@ -45,7 +45,25 @@ def lp_profile_by_launchpad_id(launchpad_id): return lp_profile -def lp_profile_by_email(email): +def query_lp_user_name(launchpad_id): + """Query user name by Launchpad ID + + :param launchpad_id: user's launchpad id + :return: user name + """ + if not launchpad_id: + return None + + lp_profile = _lp_profile_by_launchpad_id(launchpad_id) + + if not lp_profile: + LOG.debug('User with id %s not found', launchpad_id) + return launchpad_id + + return lp_profile['display_name'] + + +def _lp_profile_by_email(email): LOG.debug('Lookup user email %s at Launchpad', email) uri = LP_URI_V1 % ('people/?ws.op=getByEmail&email=' + email) lp_profile = utils.read_json_from_uri(uri, session=launchpad_session) @@ -53,6 +71,27 @@ def lp_profile_by_email(email): return lp_profile +def query_lp_info(email): + """Query Launchpad ID and user name by email + + :param email: user email + :return: tuple (launchpad id, name) + """ + lp_profile = None + if not utils.check_email_validity(email): + LOG.debug('User email is not valid %s', email) + else: + lp_profile = _lp_profile_by_email(email) + + if not lp_profile: + LOG.debug('User with email %s not found', email) + return None, None + + LOG.debug('Email %(email)s is mapped to launchpad user %(lp)s', + {'email': email, 'lp': lp_profile['name']}) + return lp_profile['name'], lp_profile['display_name'] + + def lp_module_exists(module): uri = LP_URI_DEVEL % module request = utils.do_request(uri) diff --git a/stackalytics/processor/record_processor.py b/stackalytics/processor/record_processor.py index 8246ff5eb..1abe9f178 100644 --- a/stackalytics/processor/record_processor.py +++ b/stackalytics/processor/record_processor.py @@ -79,160 +79,6 @@ class RecordProcessor(object): return self.modules, self.alias_module_map - def _find_company(self, companies, date): - for r in companies: - if date < r['end_date']: - return r['company_name'], 'strict' - return companies[-1]['company_name'], 'open' # may be overridden - - def _get_company_by_email(self, email): - if not email: - return None - - name, at, domain = email.partition('@') - if domain: - parts = domain.split('.') - for i in range(len(parts), 1, -1): - m = '.'.join(parts[len(parts) - i:]) - if m in self.domains_index: - return self.domains_index[m] - return None - - def _create_user(self, launchpad_id, email, gerrit_id, zanata_id, - user_name): - company = (self._get_company_by_email(email) or - self._get_independent()) - emails = [] - if email: - emails = [email] - user = { - 'user_id': user_processor.make_user_id( - emails=emails, launchpad_id=launchpad_id, gerrit_id=gerrit_id, - zanata_id=zanata_id), - 'launchpad_id': launchpad_id, - 'user_name': user_name or '', - 'companies': [{ - 'company_name': company, - 'end_date': 0, - }], - 'emails': emails, - } - if gerrit_id: - user['gerrit_id'] = gerrit_id - if zanata_id: - user['zanata_id'] = zanata_id - return user - - def _get_lp_info(self, email): - lp_profile = None - if not utils.check_email_validity(email): - LOG.debug('User email is not valid %s', email) - else: - lp_profile = launchpad_utils.lp_profile_by_email(email) - - if not lp_profile: - LOG.debug('User with email %s not found', email) - return None, None - - LOG.debug('Email %(email)s is mapped to launchpad user %(lp)s', - {'email': email, 'lp': lp_profile['name']}) - return lp_profile['name'], lp_profile['display_name'] - - def _get_lp_user_name(self, launchpad_id): - if not launchpad_id: - return None - - lp_profile = launchpad_utils.lp_profile_by_launchpad_id(launchpad_id) - - if not lp_profile: - LOG.debug('User with id %s not found', launchpad_id) - return launchpad_id - - return lp_profile['display_name'] - - def _get_independent(self): - return '*independent' - - def _update_user_affiliation(self, user): - for email in user.get('emails'): - company_name = self._get_company_by_email(email) - uc = user['companies'] - if (company_name and (len(uc) == 1) and - (uc[0]['company_name'] == self._get_independent())): - LOG.debug('Updating affiliation of user %s to %s', - user['user_id'], company_name) - uc[0]['company_name'] = company_name - break - - def _get_user_exact_company(self, user): - if len(user.get('companies', [])) == 1: - return user['companies'][0]['company_name'] - return None - - def _merge_user_profiles(self, user_profiles): - LOG.debug('Merge profiles: %s', user_profiles) - - # check of there are more than 1 launchpad_id nor gerrit_id - lp_ids = set(u.get('launchpad_id') for u in user_profiles - if u.get('launchpad_id')) - if len(lp_ids) > 1: - LOG.debug('Ambiguous launchpad ids: %s on profiles: %s', - lp_ids, user_profiles) - g_ids = set(u.get('gerrit_id') for u in user_profiles - if u.get('gerrit_id')) - if len(g_ids) > 1: - LOG.debug('Ambiguous gerrit ids: %s on profiles: %s', - g_ids, user_profiles) - - merged_user = {} # merged user profile - - # collect ordinary fields - for key in ['seq', 'user_name', 'user_id', 'gerrit_id', 'github_id', - 'launchpad_id', 'companies', 'static', 'zanata_id']: - value = next((v.get(key) for v in user_profiles if v.get(key)), - None) - if value: - merged_user[key] = value - - # update user_id, prefer it to be equal to launchpad_id - merged_user['user_id'] = (merged_user.get('launchpad_id') or - merged_user.get('user_id')) - - # merge emails - emails = set([]) - core_in = set([]) - for u in user_profiles: - emails |= set(u.get('emails', [])) - core_in |= set(u.get('core', [])) - merged_user['emails'] = list(emails) - if core_in: - merged_user['core'] = list(core_in) - - # merge companies - merged_companies = merged_user['companies'] - for u in user_profiles: - companies = u.get('companies') - if companies: - if (companies[0]['company_name'] != self._get_independent() or - len(companies) > 1): - merged_companies = companies - break - merged_user['companies'] = merged_companies - - self._update_user_affiliation(merged_user) - - seqs = set(u.get('seq') for u in user_profiles if u.get('seq')) - if len(seqs) > 1: - # profiles are merged, keep only one, remove others - seqs.remove(merged_user['seq']) - - for u in user_profiles: - if u.get('seq') in seqs: - LOG.debug('Delete user: %s', u) - user_processor.delete_user( - self.runtime_storage_inst, u) - return merged_user - def _need_to_fetch_launchpad(self): return CONF.fetching_user_source == 'launchpad' @@ -246,7 +92,7 @@ class RecordProcessor(object): if (self._need_to_fetch_launchpad() and email and (not user_e) and (not launchpad_id) and (not user_e.get('launchpad_id'))): # query LP - launchpad_id, lp_user_name = self._get_lp_info(email) + launchpad_id, lp_user_name = launchpad_utils.query_lp_info(email) if lp_user_name: user_name = lp_user_name @@ -258,7 +104,8 @@ class RecordProcessor(object): (not launchpad_id) and (not user_e.get('launchpad_id'))): # query LP guessed_lp_id = gerrit_id - lp_user_name = self._get_lp_user_name(guessed_lp_id) + lp_user_name = launchpad_utils.query_lp_user_name( + guessed_lp_id) if lp_user_name == user_name: launchpad_id = guessed_lp_id else: @@ -272,7 +119,7 @@ class RecordProcessor(object): (not launchpad_id) and (not user_e.get('launchpad_id'))): # query LP guessed_lp_id = zanata_id - user_name = self._get_lp_user_name(guessed_lp_id) + user_name = launchpad_utils.query_lp_user_name(guessed_lp_id) if user_name != guessed_lp_id: launchpad_id = guessed_lp_id else: @@ -281,21 +128,26 @@ class RecordProcessor(object): user_l = user_processor.load_user( self.runtime_storage_inst, launchpad_id=launchpad_id) or {} - if ((user_e.get('seq') == user_l.get('seq') == user_g.get('seq') == - user_z.get('seq')) and user_e.get('seq')): + if user_processor.are_users_same([user_e, user_l, user_g, user_z]): # If sequence numbers are set and the same, merge is not needed return user_e - user = self._create_user(launchpad_id, email, gerrit_id, zanata_id, - user_name) + user = user_processor.create_user( + self.domains_index, launchpad_id, email, gerrit_id, zanata_id, + user_name) if user_e or user_l or user_g or user_z: - user = self._merge_user_profiles( - [user_e, user_l, user_g, user_z, user]) + # merge between existing profiles and a new one + user, users_to_delete = user_processor.merge_user_profiles( + self.domains_index, [user_e, user_l, user_g, user_z, user]) + + # delete all unneeded profiles + user_processor.delete_users( + self.runtime_storage_inst, users_to_delete) else: - # create new + # create new profile if (self._need_to_fetch_launchpad() and not user_name): - user_name = self._get_lp_user_name(launchpad_id) + user_name = launchpad_utils.query_lp_user_name(launchpad_id) if user_name: user['user_name'] = user_name LOG.debug('Created new user: %s', user) @@ -312,12 +164,15 @@ class RecordProcessor(object): if user.get('user_name'): record['author_name'] = user['user_name'] - company, policy = self._find_company(user['companies'], record['date']) + company, policy = user_processor.get_company_for_date( + user['companies'], record['date']) + if not user.get('static'): # for auto-generated profiles affiliation may be overridden if company != '*robots' and policy == 'open': - company = (self._get_company_by_email( - record.get('author_email')) or company) + company = (user_processor.get_company_by_email( + self.domains_index, record.get('author_email')) or company) + record['company_name'] = company def _process_commit(self, record): diff --git a/stackalytics/processor/user_processor.py b/stackalytics/processor/user_processor.py index 65e1c4165..63efa21e8 100644 --- a/stackalytics/processor/user_processor.py +++ b/stackalytics/processor/user_processor.py @@ -1,5 +1,3 @@ -# Copyright (c) 2014 Mirantis Inc. -# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -20,6 +18,9 @@ from oslo_log import log as logging LOG = logging.getLogger(__name__) +INDEPENDENT = '*independent' +ROBOTS = '*robots' + def make_user_id(emails=None, launchpad_id=None, gerrit_id=None, member_id=None, github_id=None, ci_id=None, zanata_id=None): @@ -74,9 +75,10 @@ def load_user(runtime_storage_inst, seq=None, user_id=None, email=None, return None -def delete_user(runtime_storage_inst, user): - LOG.debug('Delete user: %s', user) - runtime_storage_inst.delete_by_key('user:%s' % user['seq']) +def delete_users(runtime_storage_inst, users): + for user in users: + LOG.debug('Delete user: %s', user) + runtime_storage_inst.delete_by_key('user:%s' % user['seq']) def update_user_profile(stored_user, user): @@ -90,3 +92,161 @@ def update_user_profile(stored_user, user): updated_user = copy.deepcopy(user) updated_user['static'] = True return updated_user + + +def get_company_for_date(companies, date): + for r in companies: + if date < r['end_date']: + return r['company_name'], 'strict' + return companies[-1]['company_name'], 'open' # may be overridden + + +def get_company_by_email(domains_index, email): + """Get company based on email domain + + Automatically maps email domain into company name. Prefers + subdomains to root domains. + + :param domains_index: dict {domain -> company name} + :param email: valid email. may be empty + :return: company name or None if nothing matches + """ + if not email: + return None + + name, at, domain = email.partition('@') + if domain: + parts = domain.split('.') + for i in range(len(parts), 1, -1): + m = '.'.join(parts[len(parts) - i:]) + if m in domains_index: + return domains_index[m] + return None + + +def create_user(domains_index, launchpad_id, email, gerrit_id, zanata_id, + user_name): + company = get_company_by_email(domains_index, email) or INDEPENDENT + emails = [email] if email else [] + + user = { + 'user_id': make_user_id( + emails=emails, launchpad_id=launchpad_id, gerrit_id=gerrit_id, + zanata_id=zanata_id), + 'launchpad_id': launchpad_id, + 'user_name': user_name or '', + 'companies': [{ + 'company_name': company, + 'end_date': 0, + }], + 'emails': emails, + } + + if gerrit_id: + user['gerrit_id'] = gerrit_id + if zanata_id: + user['zanata_id'] = zanata_id + + return user + + +def update_user_affiliation(domains_index, user): + """Update user affiliation + + Affiliation is updated only if user is currently independent + but makes contribution from company domain. + + :param domains_index: dict {domain -> company name} + :param user: user profile + """ + for email in user.get('emails'): + company_name = get_company_by_email(domains_index, email) + + uc = user['companies'] + if (company_name and (len(uc) == 1) and + (uc[0]['company_name'] == INDEPENDENT)): + LOG.debug('Updating affiliation of user %s to %s', + user['user_id'], company_name) + uc[0]['company_name'] = company_name + break + + +def merge_user_profiles(domains_index, user_profiles): + """Merge user profiles into one + + The function merges list of user profiles into one figures out which + profiles can be deleted. + + :param domains_index: dict {domain -> company name} + :param user_profiles: user profiles to merge + :return: tuple (merged user profile, [user profiles to delete]) + """ + LOG.debug('Merge profiles: %s', user_profiles) + + # check of there are more than 1 launchpad_id nor gerrit_id + lp_ids = set(u.get('launchpad_id') for u in user_profiles + if u.get('launchpad_id')) + if len(lp_ids) > 1: + LOG.debug('Ambiguous launchpad ids: %s on profiles: %s', + lp_ids, user_profiles) + g_ids = set(u.get('gerrit_id') for u in user_profiles + if u.get('gerrit_id')) + if len(g_ids) > 1: + LOG.debug('Ambiguous gerrit ids: %s on profiles: %s', + g_ids, user_profiles) + + merged_user = {} # merged user profile + + # collect ordinary fields + for key in ['seq', 'user_name', 'user_id', 'gerrit_id', 'github_id', + 'launchpad_id', 'companies', 'static', 'zanata_id']: + value = next((v.get(key) for v in user_profiles if v.get(key)), + None) + if value: + merged_user[key] = value + + # update user_id, prefer it to be equal to launchpad_id + merged_user['user_id'] = (merged_user.get('launchpad_id') or + merged_user.get('user_id')) + + # merge emails + emails = set([]) + core_in = set([]) + for u in user_profiles: + emails |= set(u.get('emails', [])) + core_in |= set(u.get('core', [])) + merged_user['emails'] = list(emails) + if core_in: + merged_user['core'] = list(core_in) + + # merge companies + merged_companies = merged_user['companies'] + for u in user_profiles: + companies = u.get('companies') + if companies: + if (companies[0]['company_name'] != INDEPENDENT or + len(companies) > 1): + merged_companies = companies + break + merged_user['companies'] = merged_companies + + update_user_affiliation(domains_index, merged_user) + + users_to_delete = [] + seqs = set(u.get('seq') for u in user_profiles if u.get('seq')) + + if len(seqs) > 1: + # profiles are merged, keep only one, remove others + seqs.remove(merged_user['seq']) + + for u in user_profiles: + if u.get('seq') in seqs: + users_to_delete.append(u) + + return merged_user, users_to_delete + + +def are_users_same(users): + """True if all users are the same and not Nones""" + x = set(u.get('seq') for u in users) + return len(x) == 1 and None not in x diff --git a/stackalytics/tests/unit/test_launchpad_utils.py b/stackalytics/tests/unit/test_launchpad_utils.py new file mode 100644 index 000000000..0dd62d9b7 --- /dev/null +++ b/stackalytics/tests/unit/test_launchpad_utils.py @@ -0,0 +1,46 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +import testtools + +from stackalytics.processor import launchpad_utils + + +class TestLaunchpadUtils(testtools.TestCase): + + @mock.patch('stackalytics.processor.launchpad_utils._lp_profile_by_email') + def test_get_lp_info(self, lp_mock): + lp_mock.return_value = dict(name='john', display_name='smith') + + observed = launchpad_utils.query_lp_info('john@smith.to') + + self.assertEqual(('john', 'smith'), observed) + lp_mock.assert_called_once_with('john@smith.to') + + @mock.patch('stackalytics.processor.launchpad_utils._lp_profile_by_email') + def test_get_lp_info_not_found(self, lp_mock): + lp_mock.return_value = None + + observed = launchpad_utils.query_lp_info('john@smith.to') + + self.assertEqual((None, None), observed) + lp_mock.assert_called_once_with('john@smith.to') + + @mock.patch('stackalytics.processor.launchpad_utils._lp_profile_by_email') + def test_get_lp_info_invalid_email(self, lp_mock): + + observed = launchpad_utils.query_lp_info('error.root') + + self.assertEqual((None, None), observed) + lp_mock.assert_not_called() diff --git a/stackalytics/tests/unit/test_record_processor.py b/stackalytics/tests/unit/test_record_processor.py index ef339aa33..5e468c3ca 100644 --- a/stackalytics/tests/unit/test_record_processor.py +++ b/stackalytics/tests/unit/test_record_processor.py @@ -24,6 +24,7 @@ from stackalytics.processor import config from stackalytics.processor import record_processor from stackalytics.processor import runtime_storage from stackalytics.processor import user_processor +from stackalytics.processor.user_processor import get_company_by_email from stackalytics.processor import utils @@ -62,12 +63,12 @@ class TestRecordProcessor(testtools.TestCase): self.read_launchpad = self.read_json_from_uri_patch.start() self.lp_profile_by_launchpad_id_patch = mock.patch( 'stackalytics.processor.launchpad_utils.' - 'lp_profile_by_launchpad_id') + '_lp_profile_by_launchpad_id') self.lp_profile_by_launchpad_id = ( self.lp_profile_by_launchpad_id_patch.start()) self.lp_profile_by_launchpad_id.return_value = None self.lp_profile_by_email_patch = mock.patch( - 'stackalytics.processor.launchpad_utils.lp_profile_by_email') + 'stackalytics.processor.launchpad_utils._lp_profile_by_email') self.lp_profile_by_email = ( self.lp_profile_by_email_patch.start()) self.lp_profile_by_email.return_value = None @@ -86,7 +87,7 @@ class TestRecordProcessor(testtools.TestCase): companies=[{'company_name': 'IBM', 'domains': ['ibm.com']}] ) email = 'jdoe@ibm.com' - res = record_processor_inst._get_company_by_email(email) + res = get_company_by_email(record_processor_inst.domains_index, email) self.assertEqual('IBM', res) def test_get_company_by_email_with_long_suffix_mapped(self): @@ -94,7 +95,7 @@ class TestRecordProcessor(testtools.TestCase): companies=[{'company_name': 'NEC', 'domains': ['nec.co.jp']}] ) email = 'man@mxw.nes.nec.co.jp' - res = record_processor_inst._get_company_by_email(email) + res = get_company_by_email(record_processor_inst.domains_index, email) self.assertEqual('NEC', res) def test_get_company_by_email_with_long_suffix_mapped_2(self): @@ -103,23 +104,15 @@ class TestRecordProcessor(testtools.TestCase): 'domains': ['nec.co.jp', 'nec.com']}] ) email = 'man@mxw.nes.nec.com' - res = record_processor_inst._get_company_by_email(email) + res = get_company_by_email(record_processor_inst.domains_index, email) self.assertEqual('NEC', res) def test_get_company_by_email_not_mapped(self): record_processor_inst = self.make_record_processor() email = 'foo@boo.com' - res = record_processor_inst._get_company_by_email(email) + res = get_company_by_email(record_processor_inst.domains_index, email) self.assertIsNone(res) - # get_lp_info - - def test_get_lp_info_invalid_email(self): - self.read_launchpad.return_value = None - record_processor_inst = self.make_record_processor(users=[]) - self.assertEqual((None, None), - record_processor_inst._get_lp_info('error.root')) - # commit processing def test_process_commit_existing_user(self): diff --git a/stackalytics/tests/unit/test_user_processor.py b/stackalytics/tests/unit/test_user_processor.py index f5fa7449d..c636d003e 100644 --- a/stackalytics/tests/unit/test_user_processor.py +++ b/stackalytics/tests/unit/test_user_processor.py @@ -85,3 +85,34 @@ class TestUserProcessor(testtools.TestCase): updated_user = user_processor.update_user_profile(stored_user, user) self.assertTrue(updated_user.get('static')) + + def test_are_users_same(self): + users = [ + dict(seq=1), + dict(seq=1), + dict(seq=1), + ] + self.assertTrue(user_processor.are_users_same(users)) + + def test_are_users_same_none(self): + users = [ + {}, + {}, + ] + self.assertFalse(user_processor.are_users_same(users)) + + def test_are_users_not_same(self): + users = [ + dict(seq=1), + dict(seq=2), + dict(seq=1), + ] + self.assertFalse(user_processor.are_users_same(users)) + + def test_are_users_not_same_2(self): + users = [ + dict(seq=1), + dict(seq=1), + {} + ] + self.assertFalse(user_processor.are_users_same(users))