diff --git a/dashboard/web.py b/dashboard/web.py index 43a2b4c84..28ae084fe 100644 --- a/dashboard/web.py +++ b/dashboard/web.py @@ -190,7 +190,7 @@ def is_project_type_valid(project_type): def get_user_from_runtime_storage(user_id): runtime_storage_inst = get_vault()['runtime_storage'] - return runtime_storage_inst.get_by_key('user:%s' % user_id) + return utils.load_user(runtime_storage_inst, user_id) # Utils --------- diff --git a/etc/stackalytics.conf b/etc/stackalytics.conf index d459447ad..b539b1502 100644 --- a/etc/stackalytics.conf +++ b/etc/stackalytics.conf @@ -28,3 +28,6 @@ # SSH username for gerrit review system access # ssh_username = user + +# Forcibly read default data and update records +# force_update = False diff --git a/stackalytics/processor/config.py b/stackalytics/processor/config.py index 1ba498d70..fdee63ae3 100644 --- a/stackalytics/processor/config.py +++ b/stackalytics/processor/config.py @@ -37,4 +37,6 @@ OPTS = [ help='SSH key for gerrit review system access'), cfg.StrOpt('ssh-username', default='user', help='SSH username for gerrit review system access'), + cfg.BoolOpt('force-update', default=False, + help='Forcibly read default data and update records'), ] diff --git a/stackalytics/processor/default_data_processor.py b/stackalytics/processor/default_data_processor.py index b736dd07d..f3e2637ed 100644 --- a/stackalytics/processor/default_data_processor.py +++ b/stackalytics/processor/default_data_processor.py @@ -21,6 +21,7 @@ from github import MainClass from stackalytics.openstack.common import log as logging from stackalytics.processor import normalizer from stackalytics.processor import record_processor +from stackalytics.processor import utils from stackalytics.processor import vcs LOG = logging.getLogger(__name__) @@ -84,7 +85,7 @@ def _retrieve_project_list(runtime_storage_inst, project_sources): def _process_users(runtime_storage_inst, users): users_index = {} for user in users: - runtime_storage_inst.set_by_key('user:%s' % user['user_id'], user) + utils.store_user(runtime_storage_inst, user) if 'user_id' in user: users_index[user['user_id']] = user if 'launchpad_id' in user: @@ -119,15 +120,17 @@ def _update_default_data(runtime_storage_inst, default_data): runtime_storage_inst.set_by_key(key, default_data[key]) -def process(runtime_storage_inst, default_data, sources_root): +def process(runtime_storage_inst, default_data, sources_root, force_update): LOG.debug('Process default data') normalizer.normalize_default_data(default_data) - if _check_default_data_change(runtime_storage_inst, default_data): + if (_check_default_data_change(runtime_storage_inst, default_data) or + force_update): _update_default_data(runtime_storage_inst, default_data) + LOG.debug('Gather release index for all repos') release_index = {} for repo in runtime_storage_inst.get_by_key('repos'): vcs_inst = vcs.get_vcs(repo, sources_root) @@ -135,6 +138,14 @@ def process(runtime_storage_inst, default_data, sources_root): record_processor_inst = record_processor.RecordProcessor( runtime_storage_inst) + # need to iterate over full view of records and generate valid + # users profiles + LOG.debug('Iterate all records to create valid users profiles') + for record in record_processor_inst.update( + runtime_storage_inst.get_all_records(), release_index): + pass + # update records according to generated users profiles + LOG.debug('Update all records according to users profiles') updated_records = record_processor_inst.update( runtime_storage_inst.get_all_records(), release_index) runtime_storage_inst.set_records(updated_records) diff --git a/stackalytics/processor/main.py b/stackalytics/processor/main.py index 2f68f63df..52a95c29b 100644 --- a/stackalytics/processor/main.py +++ b/stackalytics/processor/main.py @@ -122,6 +122,8 @@ def update_repos(runtime_storage_inst): for repo in repos: process_repo(repo, runtime_storage_inst, record_processor_inst) + record_processor_inst.finalize() + def apply_corrections(uri, runtime_storage_inst): LOG.info('Applying corrections from uri %s', uri) @@ -158,7 +160,8 @@ def main(): return not 0 default_data_processor.process(runtime_storage_inst, default_data, - cfg.CONF.sources_root) + cfg.CONF.sources_root, + cfg.CONF.force_update) update_pids(runtime_storage_inst) diff --git a/stackalytics/processor/record_processor.py b/stackalytics/processor/record_processor.py index ceeb3791d..af70bdf09 100644 --- a/stackalytics/processor/record_processor.py +++ b/stackalytics/processor/record_processor.py @@ -34,6 +34,8 @@ class RecordProcessor(object): self.releases = runtime_storage_inst.get_by_key('releases') self.releases_dates = [r['end_date'] for r in self.releases] + self.updated_users = set() + def _get_release(self, timestamp): release_index = bisect.bisect(self.releases_dates, timestamp) return self.releases[release_index]['release_name'] @@ -71,9 +73,6 @@ class RecordProcessor(object): LOG.debug('Create new user: %s', user) return user - def _store_user(self, user): - self.runtime_storage_inst.set_by_key('user:%s' % user['user_id'], user) - def _get_lp_info(self, email): lp_profile = None if not re.match(r'[\w\d_\.-]+@([\w\d_\.-]+\.)+[\w]+', email): @@ -88,11 +87,23 @@ class RecordProcessor(object): LOG.debug('User with email %s not found', email) return None, None + LOG.debug('Email is mapped to launchpad user: %s', lp_profile['name']) return lp_profile['name'], lp_profile['display_name'] def _get_independent(self): return self.domains_index[''] + def _update_user(self, user, email): + LOG.debug('Add email %s to user %s', email, user['user_id']) + user['emails'].append(email) + company_name = self._get_company_by_email(email) + if ((company_name) and (len(user['companies']) == 1) and + (user['companies'][0]['company_name'] != company_name)): + LOG.debug('Updating affiliation of user %s to %s', + user['user_id'], company_name) + user['companies'][0]['company_name'] = company_name + self.updated_users.add(user['user_id']) + def _update_record_and_user(self, record): email = record['author_email'].lower() record['author_email'] = email @@ -102,23 +113,23 @@ class RecordProcessor(object): record['launchpad_id'] = user['launchpad_id'] else: if ('launchpad_id' in record) and (record['launchpad_id']): - user = self._create_user(record['launchpad_id'], email, - record['author_name']) + launchpad_id = record['launchpad_id'] + user_name = record['author_name'] else: launchpad_id, user_name = self._get_lp_info(email) record['launchpad_id'] = launchpad_id - if (launchpad_id) and (launchpad_id in self.users_index): - # merge emails - user = self.users_index[launchpad_id] - user['emails'].append(email) - else: - # create new - if not user_name: - user_name = record['author_name'] - user = self._create_user(launchpad_id, email, user_name) + if (launchpad_id) and (launchpad_id in self.users_index): + # merge emails + user = self.users_index[launchpad_id] + self._update_user(user, email) + else: + # create new + if not user_name: + user_name = record['author_name'] + user = self._create_user(launchpad_id, email, user_name) - self._store_user(user) + utils.store_user(self.runtime_storage_inst, user) self.users_index[email] = user if user['launchpad_id']: self.users_index[user['launchpad_id']] = user @@ -257,3 +268,19 @@ class RecordProcessor(object): yield record self.runtime_storage_inst.set_by_key('users', self.users_index) + + def _get_records_for_users_to_update(self): + for record in self.runtime_storage_inst.get_all_records(): + user_id = record['user_id'] + if user_id in self.updated_users: + user = self.users_index[user_id] + user_company_name = user['companies'][0]['company_name'] + if record['company_name'] != user_company_name: + LOG.debug('Record company will be changed to: %s', + user_company_name) + record['company_name'] = user_company_name + yield record + + def finalize(self): + self.runtime_storage_inst.set_records( + self._get_records_for_users_to_update()) diff --git a/stackalytics/processor/utils.py b/stackalytics/processor/utils.py index 544a7dfca..cc871c5ea 100644 --- a/stackalytics/processor/utils.py +++ b/stackalytics/processor/utils.py @@ -50,3 +50,11 @@ def read_json_from_uri(uri): return json.loads(raw) except Exception as e: LOG.warn('Error while reading uri: %s' % e) + + +def store_user(runtime_storage_inst, user): + runtime_storage_inst.set_by_key('user:%s' % user['user_id'], user) + + +def load_user(runtime_storage_inst, user_id): + return runtime_storage_inst.get_by_key('user:%s' % user_id) diff --git a/tests/unit/test_record_processor.py b/tests/unit/test_record_processor.py index c8e76a241..254258f75 100644 --- a/tests/unit/test_record_processor.py +++ b/tests/unit/test_record_processor.py @@ -23,236 +23,214 @@ from stackalytics.processor import utils LP_URI = 'https://api.launchpad.net/1.0/people/?ws.op=getByEmail&email=%s' +COMPANIES = [ + { + 'company_name': 'SuperCompany', + 'domains': ['super.com', 'super.no'] + }, + { + "domains": ["nec.com", "nec.co.jp"], + "company_name": "NEC" + }, + { + 'company_name': '*independent', + 'domains': [''] + }, +] -def _make_users(users): - users_index = {} - for user in users: - if 'user_id' in user: - users_index[user['user_id']] = user - if 'launchpad_id' in user: - users_index[user['launchpad_id']] = user - for email in user['emails']: - users_index[email] = user - return users_index +USERS = [ + { + 'user_id': 'john_doe', + 'launchpad_id': 'john_doe', + 'user_name': 'John Doe', + 'emails': ['johndoe@gmail.com', 'jdoe@super.no'], + 'companies': [ + {'company_name': '*independent', + 'end_date': 1234567890}, + {'company_name': 'SuperCompany', + 'end_date': 0}, + ] + } +] - -def _make_companies(companies): - domains_index = {} - for company in companies: - for domain in company['domains']: - domains_index[domain] = company['company_name'] - return domains_index +RELEASES = [ + { + 'release_name': 'prehistory', + 'end_date': utils.date_to_timestamp('2011-Apr-21') + }, + { + 'release_name': 'Diablo', + 'end_date': utils.date_to_timestamp('2011-Sep-08') + }, + { + 'release_name': 'Zoo', + 'end_date': utils.date_to_timestamp('2035-Sep-08') + }, +] class TestRecordProcessor(testtools.TestCase): def setUp(self): super(TestRecordProcessor, self).setUp() - - companies = [ - { - 'company_name': 'SuperCompany', - 'domains': ['super.com', 'super.no'] - }, - { - "domains": ["nec.com", "nec.co.jp"], - "company_name": "NEC" - }, - { - 'company_name': '*independent', - 'domains': [''] - }, - ] - - self.user = { - 'user_id': 'john_doe', - 'launchpad_id': 'john_doe', - 'user_name': 'John Doe', - 'emails': ['johndoe@gmail.com', 'jdoe@super.no'], - 'companies': [ - {'company_name': '*independent', - 'end_date': 1234567890}, - {'company_name': 'SuperCompany', - 'end_date': 0}, - ] - } - self.get_users = mock.Mock(return_value=[ - self.user, - ]) - - releases = [ - { - 'release_name': 'prehistory', - 'end_date': utils.date_to_timestamp('2011-Apr-21') - }, - { - 'release_name': 'Diablo', - 'end_date': utils.date_to_timestamp('2011-Sep-08') - }, - { - 'release_name': 'Zoo', - 'end_date': utils.date_to_timestamp('2035-Sep-08') - }, - ] - - def get_by_key(table): - if table == 'companies': - return _make_companies(companies) - elif table == 'users': - return _make_users(self.get_users()) - elif table == 'releases': - return releases - else: - raise Exception('Wrong table %s' % table) - - p_storage = mock.Mock(runtime_storage.RuntimeStorage) - p_storage.get_by_key = mock.Mock(side_effect=get_by_key) - - self.runtime_storage = p_storage - self.commit_processor = record_processor.RecordProcessor(p_storage) self.read_json_from_uri_patch = mock.patch( 'stackalytics.processor.utils.read_json_from_uri') - self.read_json = self.read_json_from_uri_patch.start() + self.read_launchpad = self.read_json_from_uri_patch.start() def tearDown(self): super(TestRecordProcessor, self).tearDown() self.read_json_from_uri_patch.stop() - def _generate_commits(self, email='johndoe@gmail.com', date=1999999999): - yield { - 'record_type': 'commit', - 'commit_id': 'de7e8f297c193fb310f22815334a54b9c76a0be1', - 'author_name': 'John Doe', - 'author_email': email, - 'date': date, - 'lines_added': 25, - 'lines_deleted': 9, - 'release_name': 'havana', - } - def test_get_company_by_email_mapped(self): + record_processor_inst = make_record_processor() email = 'jdoe@super.no' - res = self.commit_processor._get_company_by_email(email) + res = record_processor_inst._get_company_by_email(email) self.assertEquals('SuperCompany', res) def test_get_company_by_email_with_long_suffix_mapped(self): + record_processor_inst = make_record_processor() email = 'man@mxw.nes.nec.co.jp' - res = self.commit_processor._get_company_by_email(email) + res = record_processor_inst._get_company_by_email(email) self.assertEquals('NEC', res) def test_get_company_by_email_with_long_suffix_mapped_2(self): + record_processor_inst = make_record_processor() email = 'man@mxw.nes.nec.com' - res = self.commit_processor._get_company_by_email(email) + res = record_processor_inst._get_company_by_email(email) self.assertEquals('NEC', res) def test_get_company_by_email_not_mapped(self): + record_processor_inst = make_record_processor() email = 'foo@boo.com' - res = self.commit_processor._get_company_by_email(email) + res = record_processor_inst._get_company_by_email(email) self.assertEquals(None, res) def test_update_commit_existing_user(self): - commit_generator = self._generate_commits() - commit = list(self.commit_processor.process(commit_generator))[0] + record_processor_inst = make_record_processor() + commit_generator = generate_commits() + commit = list(record_processor_inst.process(commit_generator))[0] self.assertEquals('SuperCompany', commit['company_name']) self.assertEquals('john_doe', commit['launchpad_id']) def test_update_commit_existing_user_old_job(self): - commit_generator = self._generate_commits(date=1000000000) - commit = list(self.commit_processor.process(commit_generator))[0] + record_processor_inst = make_record_processor() + commit_generator = generate_commits(date=1000000000) + commit = list(record_processor_inst.process(commit_generator))[0] self.assertEquals('*independent', commit['company_name']) self.assertEquals('john_doe', commit['launchpad_id']) def test_update_commit_existing_user_new_email_known_company(self): - """ - User is known to LP, his email is new to us, and maps to other company - Should return other company instead of those mentioned in user db - """ + # User is known to LP, his email is new to us, and maps to other + # company. Should return other company instead of those mentioned + # in user db email = 'johndoe@nec.co.jp' - commit_generator = self._generate_commits(email=email) + commit_generator = generate_commits(email=email) launchpad_id = 'john_doe' - self.read_json.return_value = {'name': launchpad_id, - 'display_name': launchpad_id} - user = self.user.copy() - # tell storage to return existing user - self.get_users.return_value = [user] + self.read_launchpad.return_value = {'name': launchpad_id, + 'display_name': launchpad_id} + user = make_user() + record_processor_inst = make_record_processor( + make_runtime_storage(users=[user])) - commit = list(self.commit_processor.process(commit_generator))[0] + commit = list(record_processor_inst.process(commit_generator))[0] - self.runtime_storage.set_by_key.assert_called_with('users', mock.ANY) - self.read_json.assert_called_once_with(LP_URI % email) + self.read_launchpad.assert_called_once_with(LP_URI % email) self.assertIn(email, user['emails']) self.assertEquals('NEC', commit['company_name']) self.assertEquals(launchpad_id, commit['launchpad_id']) def test_update_commit_existing_user_new_email_unknown_company(self): - """ - User is known to LP, but his email is new to us. Should match - the user and return current company - """ + # User is known to LP, but his email is new to us. Should match + # the user and return current company email = 'johndoe@yahoo.com' - commit_generator = self._generate_commits(email=email) + commit_generator = generate_commits(email=email) launchpad_id = 'john_doe' - self.read_json.return_value = {'name': launchpad_id, - 'display_name': launchpad_id} - user = self.user.copy() - # tell storage to return existing user - self.get_users.return_value = [user] + self.read_launchpad.return_value = {'name': launchpad_id, + 'display_name': launchpad_id} + user = make_user() + record_processor_inst = make_record_processor( + make_runtime_storage(users=[user])) - commit = list(self.commit_processor.process(commit_generator))[0] + commit = list(record_processor_inst.process(commit_generator))[0] - self.runtime_storage.set_by_key.assert_called_with('users', mock.ANY) - self.read_json.assert_called_once_with(LP_URI % email) + self.read_launchpad.assert_called_once_with(LP_URI % email) self.assertIn(email, user['emails']) self.assertEquals('SuperCompany', commit['company_name']) self.assertEquals(launchpad_id, commit['launchpad_id']) + def test_update_commit_existing_user_new_email_known_company_update(self): + # User is known to LP, his email is new to us and belongs to company B. + # Should match the user and return company B and update user + email = 'johndoe@nec.com' + commit_generator = generate_commits(email=email) + launchpad_id = 'john_doe' + self.read_launchpad.return_value = {'name': launchpad_id, + 'display_name': launchpad_id} + user = { + 'user_id': 'john_doe', + 'launchpad_id': launchpad_id, + 'user_name': 'John Doe', + 'emails': ['johndoe@gmail.com'], + 'companies': [ + {'company_name': '*independent', 'end_date': 0} + ] + } + record_processor_inst = make_record_processor( + make_runtime_storage(users=[user])) + + commit = list(record_processor_inst.process(commit_generator))[0] + + self.read_launchpad.assert_called_once_with(LP_URI % email) + self.assertIn(email, user['emails']) + self.assertEquals('NEC', user['companies'][0]['company_name'], + message='User affiliation should be updated') + self.assertEquals('NEC', commit['company_name']) + self.assertEquals(launchpad_id, commit['launchpad_id']) + def test_update_commit_new_user(self): - """ - User is known to LP, but new to us - Should add new user and set company depending on email - """ + # User is known to LP, but new to us + # Should add new user and set company depending on email email = 'smith@nec.com' - commit_generator = self._generate_commits(email=email) + commit_generator = generate_commits(email=email) launchpad_id = 'smith' - self.read_json.return_value = {'name': launchpad_id, - 'display_name': 'Smith'} - self.get_users.return_value = [] + self.read_launchpad.return_value = {'name': launchpad_id, + 'display_name': 'Smith'} + record_processor_inst = make_record_processor( + make_runtime_storage(users=[])) - commit = list(self.commit_processor.process(commit_generator))[0] + commit = list(record_processor_inst.process(commit_generator))[0] - self.read_json.assert_called_once_with(LP_URI % email) + self.read_launchpad.assert_called_once_with(LP_URI % email) self.assertEquals('NEC', commit['company_name']) self.assertEquals(launchpad_id, commit['launchpad_id']) def test_update_commit_new_user_unknown_to_lb(self): - """ - User is new to us and not known to LP - Should set user name and empty LPid - """ + # User is new to us and not known to LP + # Should set user name and empty LPid email = 'inkognito@avs.com' - commit_generator = self._generate_commits(email=email) - self.read_json.return_value = None - self.get_users.return_value = [] + commit_generator = generate_commits(email=email) + self.read_launchpad.return_value = None + record_processor_inst = make_record_processor( + make_runtime_storage(users=[])) - commit = list(self.commit_processor.process(commit_generator))[0] + commit = list(record_processor_inst.process(commit_generator))[0] - self.read_json.assert_called_once_with(LP_URI % email) + self.read_launchpad.assert_called_once_with(LP_URI % email) self.assertEquals('*independent', commit['company_name']) self.assertEquals(None, commit['launchpad_id']) def test_update_commit_invalid_email(self): - """ - User's email is malformed - """ + # User's email is malformed email = 'error.root' - commit_generator = self._generate_commits(email=email) - self.read_json.return_value = None - self.get_users.return_value = [] + commit_generator = generate_commits(email=email) + self.read_launchpad.return_value = None + record_processor_inst = make_record_processor( + make_runtime_storage(users=[])) - commit = list(self.commit_processor.process(commit_generator))[0] + commit = list(record_processor_inst.process(commit_generator))[0] - self.assertEquals(0, self.read_json.called) + self.assertEquals(0, self.read_launchpad.called) self.assertEquals('*independent', commit['company_name']) self.assertEquals(None, commit['launchpad_id']) @@ -285,8 +263,81 @@ class TestRecordProcessor(testtools.TestCase): def test_update_record_no_changes(self): commit_generator = self._generate_record_commit() release_index = {'0afdc64bfd041b03943ceda7849c4443940b6053': 'havana'} + record_processor_inst = make_record_processor( + make_runtime_storage(users=[])) - updated = list(self.commit_processor.update(commit_generator, + updated = list(record_processor_inst.update(commit_generator, release_index)) self.assertEquals(0, len(updated)) + + +# Helpers + +def generate_commits(email='johndoe@gmail.com', date=1999999999): + yield { + 'record_type': 'commit', + 'commit_id': 'de7e8f297c193fb310f22815334a54b9c76a0be1', + 'author_name': 'John Doe', + 'author_email': email, + 'date': date, + 'lines_added': 25, + 'lines_deleted': 9, + 'release_name': 'havana', + } + + +def make_runtime_storage(users=None, companies=None, releases=None): + def get_by_key(table): + if table == 'companies': + return _make_companies(companies or COMPANIES) + elif table == 'users': + return _make_users(users or USERS) + elif table == 'releases': + return releases or RELEASES + else: + raise Exception('Wrong table %s' % table) + + rs = mock.Mock(runtime_storage.RuntimeStorage) + rs.get_by_key = mock.Mock(side_effect=get_by_key) + return rs + + +def make_record_processor(runtime_storage_inst=None): + return record_processor.RecordProcessor(runtime_storage_inst or + make_runtime_storage()) + + +def make_user(): + return { + 'user_id': 'john_doe', + 'launchpad_id': 'john_doe', + 'user_name': 'John Doe', + 'emails': ['johndoe@gmail.com', 'jdoe@super.no'], + 'companies': [ + {'company_name': '*independent', + 'end_date': 1234567890}, + {'company_name': 'SuperCompany', + 'end_date': 0}, + ] + } + + +def _make_users(users): + users_index = {} + for user in users: + if 'user_id' in user: + users_index[user['user_id']] = user + if 'launchpad_id' in user: + users_index[user['launchpad_id']] = user + for email in user['emails']: + users_index[email] = user + return users_index + + +def _make_companies(companies): + domains_index = {} + for company in companies: + for domain in company['domains']: + domains_index[domain] = company['company_name'] + return domains_index