Refactor record processor

Move user-related and Launchpad-related functions from record
processor to corresponding modules.

Change-Id: I9bd08fec4bc074373255118ab1be702113c56a6f
This commit is contained in:
Ilya Shakhat 2017-08-24 18:01:01 +02:00
parent afce3ea987
commit 6c7f19e011
6 changed files with 313 additions and 189 deletions

View File

@ -37,7 +37,7 @@ def link_to_launchpad_id(link):
return link[link.find('~') + 1:]
def lp_profile_by_launchpad_id(launchpad_id):
def _lp_profile_by_launchpad_id(launchpad_id):
LOG.debug('Lookup user id %s at Launchpad', launchpad_id)
uri = LP_URI_V1 % ('~' + launchpad_id)
lp_profile = utils.read_json_from_uri(uri, session=launchpad_session)
@ -45,7 +45,25 @@ def lp_profile_by_launchpad_id(launchpad_id):
return lp_profile
def lp_profile_by_email(email):
def query_lp_user_name(launchpad_id):
"""Query user name by Launchpad ID
:param launchpad_id: user's launchpad id
:return: user name
"""
if not launchpad_id:
return None
lp_profile = _lp_profile_by_launchpad_id(launchpad_id)
if not lp_profile:
LOG.debug('User with id %s not found', launchpad_id)
return launchpad_id
return lp_profile['display_name']
def _lp_profile_by_email(email):
LOG.debug('Lookup user email %s at Launchpad', email)
uri = LP_URI_V1 % ('people/?ws.op=getByEmail&email=' + email)
lp_profile = utils.read_json_from_uri(uri, session=launchpad_session)
@ -53,6 +71,27 @@ def lp_profile_by_email(email):
return lp_profile
def query_lp_info(email):
"""Query Launchpad ID and user name by email
:param email: user email
:return: tuple (launchpad id, name)
"""
lp_profile = None
if not utils.check_email_validity(email):
LOG.debug('User email is not valid %s', email)
else:
lp_profile = _lp_profile_by_email(email)
if not lp_profile:
LOG.debug('User with email %s not found', email)
return None, None
LOG.debug('Email %(email)s is mapped to launchpad user %(lp)s',
{'email': email, 'lp': lp_profile['name']})
return lp_profile['name'], lp_profile['display_name']
def lp_module_exists(module):
uri = LP_URI_DEVEL % module
request = utils.do_request(uri)

View File

@ -79,160 +79,6 @@ class RecordProcessor(object):
return self.modules, self.alias_module_map
def _find_company(self, companies, date):
for r in companies:
if date < r['end_date']:
return r['company_name'], 'strict'
return companies[-1]['company_name'], 'open' # may be overridden
def _get_company_by_email(self, email):
if not email:
return None
name, at, domain = email.partition('@')
if domain:
parts = domain.split('.')
for i in range(len(parts), 1, -1):
m = '.'.join(parts[len(parts) - i:])
if m in self.domains_index:
return self.domains_index[m]
return None
def _create_user(self, launchpad_id, email, gerrit_id, zanata_id,
user_name):
company = (self._get_company_by_email(email) or
self._get_independent())
emails = []
if email:
emails = [email]
user = {
'user_id': user_processor.make_user_id(
emails=emails, launchpad_id=launchpad_id, gerrit_id=gerrit_id,
zanata_id=zanata_id),
'launchpad_id': launchpad_id,
'user_name': user_name or '',
'companies': [{
'company_name': company,
'end_date': 0,
}],
'emails': emails,
}
if gerrit_id:
user['gerrit_id'] = gerrit_id
if zanata_id:
user['zanata_id'] = zanata_id
return user
def _get_lp_info(self, email):
lp_profile = None
if not utils.check_email_validity(email):
LOG.debug('User email is not valid %s', email)
else:
lp_profile = launchpad_utils.lp_profile_by_email(email)
if not lp_profile:
LOG.debug('User with email %s not found', email)
return None, None
LOG.debug('Email %(email)s is mapped to launchpad user %(lp)s',
{'email': email, 'lp': lp_profile['name']})
return lp_profile['name'], lp_profile['display_name']
def _get_lp_user_name(self, launchpad_id):
if not launchpad_id:
return None
lp_profile = launchpad_utils.lp_profile_by_launchpad_id(launchpad_id)
if not lp_profile:
LOG.debug('User with id %s not found', launchpad_id)
return launchpad_id
return lp_profile['display_name']
def _get_independent(self):
return '*independent'
def _update_user_affiliation(self, user):
for email in user.get('emails'):
company_name = self._get_company_by_email(email)
uc = user['companies']
if (company_name and (len(uc) == 1) and
(uc[0]['company_name'] == self._get_independent())):
LOG.debug('Updating affiliation of user %s to %s',
user['user_id'], company_name)
uc[0]['company_name'] = company_name
break
def _get_user_exact_company(self, user):
if len(user.get('companies', [])) == 1:
return user['companies'][0]['company_name']
return None
def _merge_user_profiles(self, user_profiles):
LOG.debug('Merge profiles: %s', user_profiles)
# check of there are more than 1 launchpad_id nor gerrit_id
lp_ids = set(u.get('launchpad_id') for u in user_profiles
if u.get('launchpad_id'))
if len(lp_ids) > 1:
LOG.debug('Ambiguous launchpad ids: %s on profiles: %s',
lp_ids, user_profiles)
g_ids = set(u.get('gerrit_id') for u in user_profiles
if u.get('gerrit_id'))
if len(g_ids) > 1:
LOG.debug('Ambiguous gerrit ids: %s on profiles: %s',
g_ids, user_profiles)
merged_user = {} # merged user profile
# collect ordinary fields
for key in ['seq', 'user_name', 'user_id', 'gerrit_id', 'github_id',
'launchpad_id', 'companies', 'static', 'zanata_id']:
value = next((v.get(key) for v in user_profiles if v.get(key)),
None)
if value:
merged_user[key] = value
# update user_id, prefer it to be equal to launchpad_id
merged_user['user_id'] = (merged_user.get('launchpad_id') or
merged_user.get('user_id'))
# merge emails
emails = set([])
core_in = set([])
for u in user_profiles:
emails |= set(u.get('emails', []))
core_in |= set(u.get('core', []))
merged_user['emails'] = list(emails)
if core_in:
merged_user['core'] = list(core_in)
# merge companies
merged_companies = merged_user['companies']
for u in user_profiles:
companies = u.get('companies')
if companies:
if (companies[0]['company_name'] != self._get_independent() or
len(companies) > 1):
merged_companies = companies
break
merged_user['companies'] = merged_companies
self._update_user_affiliation(merged_user)
seqs = set(u.get('seq') for u in user_profiles if u.get('seq'))
if len(seqs) > 1:
# profiles are merged, keep only one, remove others
seqs.remove(merged_user['seq'])
for u in user_profiles:
if u.get('seq') in seqs:
LOG.debug('Delete user: %s', u)
user_processor.delete_user(
self.runtime_storage_inst, u)
return merged_user
def _need_to_fetch_launchpad(self):
return CONF.fetching_user_source == 'launchpad'
@ -246,7 +92,7 @@ class RecordProcessor(object):
if (self._need_to_fetch_launchpad() and email and (not user_e) and
(not launchpad_id) and (not user_e.get('launchpad_id'))):
# query LP
launchpad_id, lp_user_name = self._get_lp_info(email)
launchpad_id, lp_user_name = launchpad_utils.query_lp_info(email)
if lp_user_name:
user_name = lp_user_name
@ -258,7 +104,8 @@ class RecordProcessor(object):
(not launchpad_id) and (not user_e.get('launchpad_id'))):
# query LP
guessed_lp_id = gerrit_id
lp_user_name = self._get_lp_user_name(guessed_lp_id)
lp_user_name = launchpad_utils.query_lp_user_name(
guessed_lp_id)
if lp_user_name == user_name:
launchpad_id = guessed_lp_id
else:
@ -272,7 +119,7 @@ class RecordProcessor(object):
(not launchpad_id) and (not user_e.get('launchpad_id'))):
# query LP
guessed_lp_id = zanata_id
user_name = self._get_lp_user_name(guessed_lp_id)
user_name = launchpad_utils.query_lp_user_name(guessed_lp_id)
if user_name != guessed_lp_id:
launchpad_id = guessed_lp_id
else:
@ -281,21 +128,26 @@ class RecordProcessor(object):
user_l = user_processor.load_user(
self.runtime_storage_inst, launchpad_id=launchpad_id) or {}
if ((user_e.get('seq') == user_l.get('seq') == user_g.get('seq') ==
user_z.get('seq')) and user_e.get('seq')):
if user_processor.are_users_same([user_e, user_l, user_g, user_z]):
# If sequence numbers are set and the same, merge is not needed
return user_e
user = self._create_user(launchpad_id, email, gerrit_id, zanata_id,
user_name)
user = user_processor.create_user(
self.domains_index, launchpad_id, email, gerrit_id, zanata_id,
user_name)
if user_e or user_l or user_g or user_z:
user = self._merge_user_profiles(
[user_e, user_l, user_g, user_z, user])
# merge between existing profiles and a new one
user, users_to_delete = user_processor.merge_user_profiles(
self.domains_index, [user_e, user_l, user_g, user_z, user])
# delete all unneeded profiles
user_processor.delete_users(
self.runtime_storage_inst, users_to_delete)
else:
# create new
# create new profile
if (self._need_to_fetch_launchpad() and not user_name):
user_name = self._get_lp_user_name(launchpad_id)
user_name = launchpad_utils.query_lp_user_name(launchpad_id)
if user_name:
user['user_name'] = user_name
LOG.debug('Created new user: %s', user)
@ -312,12 +164,15 @@ class RecordProcessor(object):
if user.get('user_name'):
record['author_name'] = user['user_name']
company, policy = self._find_company(user['companies'], record['date'])
company, policy = user_processor.get_company_for_date(
user['companies'], record['date'])
if not user.get('static'):
# for auto-generated profiles affiliation may be overridden
if company != '*robots' and policy == 'open':
company = (self._get_company_by_email(
record.get('author_email')) or company)
company = (user_processor.get_company_by_email(
self.domains_index, record.get('author_email')) or company)
record['company_name'] = company
def _process_commit(self, record):

View File

@ -1,5 +1,3 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
@ -20,6 +18,9 @@ from oslo_log import log as logging
LOG = logging.getLogger(__name__)
INDEPENDENT = '*independent'
ROBOTS = '*robots'
def make_user_id(emails=None, launchpad_id=None, gerrit_id=None,
member_id=None, github_id=None, ci_id=None, zanata_id=None):
@ -74,9 +75,10 @@ def load_user(runtime_storage_inst, seq=None, user_id=None, email=None,
return None
def delete_user(runtime_storage_inst, user):
LOG.debug('Delete user: %s', user)
runtime_storage_inst.delete_by_key('user:%s' % user['seq'])
def delete_users(runtime_storage_inst, users):
for user in users:
LOG.debug('Delete user: %s', user)
runtime_storage_inst.delete_by_key('user:%s' % user['seq'])
def update_user_profile(stored_user, user):
@ -90,3 +92,161 @@ def update_user_profile(stored_user, user):
updated_user = copy.deepcopy(user)
updated_user['static'] = True
return updated_user
def get_company_for_date(companies, date):
for r in companies:
if date < r['end_date']:
return r['company_name'], 'strict'
return companies[-1]['company_name'], 'open' # may be overridden
def get_company_by_email(domains_index, email):
"""Get company based on email domain
Automatically maps email domain into company name. Prefers
subdomains to root domains.
:param domains_index: dict {domain -> company name}
:param email: valid email. may be empty
:return: company name or None if nothing matches
"""
if not email:
return None
name, at, domain = email.partition('@')
if domain:
parts = domain.split('.')
for i in range(len(parts), 1, -1):
m = '.'.join(parts[len(parts) - i:])
if m in domains_index:
return domains_index[m]
return None
def create_user(domains_index, launchpad_id, email, gerrit_id, zanata_id,
user_name):
company = get_company_by_email(domains_index, email) or INDEPENDENT
emails = [email] if email else []
user = {
'user_id': make_user_id(
emails=emails, launchpad_id=launchpad_id, gerrit_id=gerrit_id,
zanata_id=zanata_id),
'launchpad_id': launchpad_id,
'user_name': user_name or '',
'companies': [{
'company_name': company,
'end_date': 0,
}],
'emails': emails,
}
if gerrit_id:
user['gerrit_id'] = gerrit_id
if zanata_id:
user['zanata_id'] = zanata_id
return user
def update_user_affiliation(domains_index, user):
"""Update user affiliation
Affiliation is updated only if user is currently independent
but makes contribution from company domain.
:param domains_index: dict {domain -> company name}
:param user: user profile
"""
for email in user.get('emails'):
company_name = get_company_by_email(domains_index, email)
uc = user['companies']
if (company_name and (len(uc) == 1) and
(uc[0]['company_name'] == INDEPENDENT)):
LOG.debug('Updating affiliation of user %s to %s',
user['user_id'], company_name)
uc[0]['company_name'] = company_name
break
def merge_user_profiles(domains_index, user_profiles):
"""Merge user profiles into one
The function merges list of user profiles into one figures out which
profiles can be deleted.
:param domains_index: dict {domain -> company name}
:param user_profiles: user profiles to merge
:return: tuple (merged user profile, [user profiles to delete])
"""
LOG.debug('Merge profiles: %s', user_profiles)
# check of there are more than 1 launchpad_id nor gerrit_id
lp_ids = set(u.get('launchpad_id') for u in user_profiles
if u.get('launchpad_id'))
if len(lp_ids) > 1:
LOG.debug('Ambiguous launchpad ids: %s on profiles: %s',
lp_ids, user_profiles)
g_ids = set(u.get('gerrit_id') for u in user_profiles
if u.get('gerrit_id'))
if len(g_ids) > 1:
LOG.debug('Ambiguous gerrit ids: %s on profiles: %s',
g_ids, user_profiles)
merged_user = {} # merged user profile
# collect ordinary fields
for key in ['seq', 'user_name', 'user_id', 'gerrit_id', 'github_id',
'launchpad_id', 'companies', 'static', 'zanata_id']:
value = next((v.get(key) for v in user_profiles if v.get(key)),
None)
if value:
merged_user[key] = value
# update user_id, prefer it to be equal to launchpad_id
merged_user['user_id'] = (merged_user.get('launchpad_id') or
merged_user.get('user_id'))
# merge emails
emails = set([])
core_in = set([])
for u in user_profiles:
emails |= set(u.get('emails', []))
core_in |= set(u.get('core', []))
merged_user['emails'] = list(emails)
if core_in:
merged_user['core'] = list(core_in)
# merge companies
merged_companies = merged_user['companies']
for u in user_profiles:
companies = u.get('companies')
if companies:
if (companies[0]['company_name'] != INDEPENDENT or
len(companies) > 1):
merged_companies = companies
break
merged_user['companies'] = merged_companies
update_user_affiliation(domains_index, merged_user)
users_to_delete = []
seqs = set(u.get('seq') for u in user_profiles if u.get('seq'))
if len(seqs) > 1:
# profiles are merged, keep only one, remove others
seqs.remove(merged_user['seq'])
for u in user_profiles:
if u.get('seq') in seqs:
users_to_delete.append(u)
return merged_user, users_to_delete
def are_users_same(users):
"""True if all users are the same and not Nones"""
x = set(u.get('seq') for u in users)
return len(x) == 1 and None not in x

View File

@ -0,0 +1,46 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import testtools
from stackalytics.processor import launchpad_utils
class TestLaunchpadUtils(testtools.TestCase):
@mock.patch('stackalytics.processor.launchpad_utils._lp_profile_by_email')
def test_get_lp_info(self, lp_mock):
lp_mock.return_value = dict(name='john', display_name='smith')
observed = launchpad_utils.query_lp_info('john@smith.to')
self.assertEqual(('john', 'smith'), observed)
lp_mock.assert_called_once_with('john@smith.to')
@mock.patch('stackalytics.processor.launchpad_utils._lp_profile_by_email')
def test_get_lp_info_not_found(self, lp_mock):
lp_mock.return_value = None
observed = launchpad_utils.query_lp_info('john@smith.to')
self.assertEqual((None, None), observed)
lp_mock.assert_called_once_with('john@smith.to')
@mock.patch('stackalytics.processor.launchpad_utils._lp_profile_by_email')
def test_get_lp_info_invalid_email(self, lp_mock):
observed = launchpad_utils.query_lp_info('error.root')
self.assertEqual((None, None), observed)
lp_mock.assert_not_called()

View File

@ -24,6 +24,7 @@ from stackalytics.processor import config
from stackalytics.processor import record_processor
from stackalytics.processor import runtime_storage
from stackalytics.processor import user_processor
from stackalytics.processor.user_processor import get_company_by_email
from stackalytics.processor import utils
@ -62,12 +63,12 @@ class TestRecordProcessor(testtools.TestCase):
self.read_launchpad = self.read_json_from_uri_patch.start()
self.lp_profile_by_launchpad_id_patch = mock.patch(
'stackalytics.processor.launchpad_utils.'
'lp_profile_by_launchpad_id')
'_lp_profile_by_launchpad_id')
self.lp_profile_by_launchpad_id = (
self.lp_profile_by_launchpad_id_patch.start())
self.lp_profile_by_launchpad_id.return_value = None
self.lp_profile_by_email_patch = mock.patch(
'stackalytics.processor.launchpad_utils.lp_profile_by_email')
'stackalytics.processor.launchpad_utils._lp_profile_by_email')
self.lp_profile_by_email = (
self.lp_profile_by_email_patch.start())
self.lp_profile_by_email.return_value = None
@ -86,7 +87,7 @@ class TestRecordProcessor(testtools.TestCase):
companies=[{'company_name': 'IBM', 'domains': ['ibm.com']}]
)
email = 'jdoe@ibm.com'
res = record_processor_inst._get_company_by_email(email)
res = get_company_by_email(record_processor_inst.domains_index, email)
self.assertEqual('IBM', res)
def test_get_company_by_email_with_long_suffix_mapped(self):
@ -94,7 +95,7 @@ class TestRecordProcessor(testtools.TestCase):
companies=[{'company_name': 'NEC', 'domains': ['nec.co.jp']}]
)
email = 'man@mxw.nes.nec.co.jp'
res = record_processor_inst._get_company_by_email(email)
res = get_company_by_email(record_processor_inst.domains_index, email)
self.assertEqual('NEC', res)
def test_get_company_by_email_with_long_suffix_mapped_2(self):
@ -103,23 +104,15 @@ class TestRecordProcessor(testtools.TestCase):
'domains': ['nec.co.jp', 'nec.com']}]
)
email = 'man@mxw.nes.nec.com'
res = record_processor_inst._get_company_by_email(email)
res = get_company_by_email(record_processor_inst.domains_index, email)
self.assertEqual('NEC', res)
def test_get_company_by_email_not_mapped(self):
record_processor_inst = self.make_record_processor()
email = 'foo@boo.com'
res = record_processor_inst._get_company_by_email(email)
res = get_company_by_email(record_processor_inst.domains_index, email)
self.assertIsNone(res)
# get_lp_info
def test_get_lp_info_invalid_email(self):
self.read_launchpad.return_value = None
record_processor_inst = self.make_record_processor(users=[])
self.assertEqual((None, None),
record_processor_inst._get_lp_info('error.root'))
# commit processing
def test_process_commit_existing_user(self):

View File

@ -85,3 +85,34 @@ class TestUserProcessor(testtools.TestCase):
updated_user = user_processor.update_user_profile(stored_user, user)
self.assertTrue(updated_user.get('static'))
def test_are_users_same(self):
users = [
dict(seq=1),
dict(seq=1),
dict(seq=1),
]
self.assertTrue(user_processor.are_users_same(users))
def test_are_users_same_none(self):
users = [
{},
{},
]
self.assertFalse(user_processor.are_users_same(users))
def test_are_users_not_same(self):
users = [
dict(seq=1),
dict(seq=2),
dict(seq=1),
]
self.assertFalse(user_processor.are_users_same(users))
def test_are_users_not_same_2(self):
users = [
dict(seq=1),
dict(seq=1),
{}
]
self.assertFalse(user_processor.are_users_same(users))