diff --git a/stackalytics/processor/default_data_processor.py b/stackalytics/processor/default_data_processor.py index f1f101c06..f8033e4ce 100644 --- a/stackalytics/processor/default_data_processor.py +++ b/stackalytics/processor/default_data_processor.py @@ -114,26 +114,31 @@ def _update_with_driverlog_data(default_data, driverlog_data_uri): LOG.info('Reading DriverLog data from uri: %s', driverlog_data_uri) driverlog_data = utils.read_json_from_uri(driverlog_data_uri) - cis = {} + module_ci_ids = {} + ci_ids = set() for driver in driverlog_data['drivers']: if 'ci' in driver: module = driver['project_id'].split('/')[1] - if module not in cis: - cis[module] = {} - cis[module][driver['ci']['id']] = driver + if module not in module_ci_ids: + module_ci_ids[module] = {} + ci_id = driver['ci']['id'] + module_ci_ids[module][ci_id] = driver - default_data['users'].append({ - 'gerrit_id': driver['ci']['id'], - 'user_name': driver['ci']['id'], - 'static': True, - 'companies': [ - {'company_name': driver['vendor'], 'end_date': None}], - }) + if ci_id not in ci_ids: + ci_ids.add(ci_id) + default_data['users'].append({ + 'user_id': user_processor.make_user_id(gerrit_id=ci_id), + 'gerrit_id': ci_id, + 'user_name': ci_id, + 'static': True, + 'companies': [ + {'company_name': driver['vendor'], 'end_date': None}], + }) for repo in default_data['repos']: - if repo['module'] in cis: - repo['ci'] = cis[repo['module']] + if repo['module'] in module_ci_ids: + repo['ci'] = module_ci_ids[repo['module']] def _store_users(runtime_storage_inst, users): diff --git a/stackalytics/processor/dump.py b/stackalytics/processor/dump.py index 891b033ed..b338a94e7 100644 --- a/stackalytics/processor/dump.py +++ b/stackalytics/processor/dump.py @@ -110,7 +110,7 @@ def export_data(memcached_inst, fd): key_prefix = key + ':' - for record_id_set in utils.make_range(0, count, BULK_READ_SIZE): + for record_id_set in utils.make_range(0, count + 1, BULK_READ_SIZE): # memcache limits the size of returned data to specific yet unknown # chunk size, the code should verify that all requested records are # returned an be able to fall back to one-by-one retrieval @@ -126,13 +126,17 @@ def export_data(memcached_inst, fd): for k, v in six.iteritems(chunk): pickle.dump((key_prefix + str(k), v), fd) - for user_seq in range(memcached_inst.get('user:count') or 0): + for user_seq in range((memcached_inst.get('user:count') or 0) + 1): user = memcached_inst.get('user:%s' % user_seq) if user: if user.get('user_id'): pickle.dump(('user:%s' % user['user_id'], user), fd) if user.get('launchpad_id'): pickle.dump(('user:%s' % user['launchpad_id'], user), fd) + if user.get('gerrit_id'): + pickle.dump(('user:gerrit:%s' % user['gerrit_id'], user), fd) + if user.get('member_id'): + pickle.dump(('user:member:%s' % user['member_id'], user), fd) for email in user.get('emails') or []: pickle.dump(('user:%s' % email, user), fd) diff --git a/stackalytics/processor/normalizer.py b/stackalytics/processor/normalizer.py index 10ed5529c..7a8615d14 100644 --- a/stackalytics/processor/normalizer.py +++ b/stackalytics/processor/normalizer.py @@ -39,7 +39,8 @@ def _normalize_user(user): user['companies'].sort(key=utils.cmp_to_key(end_date_comparator)) user['user_id'] = user_processor.make_user_id( launchpad_id=user.get('launchpad_id'), - email=user.get('email')) + emails=user.get('emails'), + gerrit_id=user.get('gerrit_id')) def _normalize_users(users): diff --git a/stackalytics/processor/record_processor.py b/stackalytics/processor/record_processor.py index 6777f6f09..de610eea0 100644 --- a/stackalytics/processor/record_processor.py +++ b/stackalytics/processor/record_processor.py @@ -98,22 +98,22 @@ class RecordProcessor(object): def _create_user(self, launchpad_id, email, gerrit_id, user_name): company = (self._get_company_by_email(email) or self._get_independent()) + emails = [] + if email: + emails = [email] user = { 'user_id': user_processor.make_user_id( - email=email, launchpad_id=launchpad_id, gerrit_id=gerrit_id), + emails=emails, launchpad_id=launchpad_id, gerrit_id=gerrit_id), 'launchpad_id': launchpad_id, 'user_name': user_name or '', 'companies': [{ 'company_name': company, 'end_date': 0, }], + 'emails': emails, } if gerrit_id: user['gerrit_id'] = gerrit_id - if email: - user['emails'] = [email] - else: - user['emails'] = [] return user def _get_lp_info(self, email): @@ -163,6 +163,20 @@ class RecordProcessor(object): return None def _merge_user_profiles(self, user_profiles): + LOG.debug('Merge profiles: %s', user_profiles) + + # check of there are more than 1 launchpad_id nor gerrit_id + lp_ids = set(u.get('launchpad_id') for u in user_profiles + if u.get('launchpad_id')) + if len(lp_ids) > 1: + LOG.info('Ambiguous launchpad ids: %s on profiles: %s', + (lp_ids, user_profiles)) + g_ids = set(u.get('gerrit_id') for u in user_profiles + if u.get('gerrit_id')) + if len(g_ids) > 1: + LOG.info('Ambiguous gerrit ids: %s on profiles: %s', + (g_ids, user_profiles)) + merged_user = {} # merged user profile # collect ordinary fields @@ -219,7 +233,8 @@ class RecordProcessor(object): user_name = record.get('author_name') launchpad_id = record.get('launchpad_id') - if email and (not user_e) and (not launchpad_id): + if (email and (not user_e) and (not launchpad_id) and + (not user_e.get('launchpad_id'))): # query LP launchpad_id, lp_user_name = self._get_lp_info(email) if lp_user_name: @@ -229,7 +244,8 @@ class RecordProcessor(object): if gerrit_id: user_g = user_processor.load_user( self.runtime_storage_inst, gerrit_id=gerrit_id) or {} - if (not user_g) and (not launchpad_id): + if ((not user_g) and (not launchpad_id) and + (not user_e.get('launchpad_id'))): # query LP guessed_lp_id = gerrit_id lp_user_name = self._get_lp_user_name(guessed_lp_id) @@ -241,13 +257,13 @@ class RecordProcessor(object): user_l = user_processor.load_user( self.runtime_storage_inst, launchpad_id=launchpad_id) or {} - user = self._create_user(launchpad_id, email, gerrit_id, user_name) - if ((user_e.get('seq') == user_l.get('seq') == user_g.get('seq')) and user_e.get('seq')): # sequence numbers are set and the same, merge is not needed user = user_e else: + user = self._create_user(launchpad_id, email, gerrit_id, user_name) + if user_e or user_l or user_g: user = self._merge_user_profiles( [user_e, user_l, user_g, user]) @@ -268,8 +284,6 @@ class RecordProcessor(object): user = self.update_user(record) record['user_id'] = user['user_id'] - if user.get('launchpad_id'): - record['launchpad_id'] = user['launchpad_id'] if user.get('user_name'): record['author_name'] = user['user_name'] @@ -535,7 +549,7 @@ class RecordProcessor(object): ci_vote['primary_key'] = ('%s:%s' % (reviewer['username'], ci_vote['date'])) ci_vote['user_id'] = reviewer['username'] - ci_vote['launchpad_id'] = reviewer['username'] + ci_vote['gerrit_id'] = reviewer['username'] ci_vote['author_name'] = reviewer.get('name') or reviewer['username'] ci_vote['author_email'] = ( reviewer.get('email') or reviewer['username']).lower() diff --git a/stackalytics/processor/user_processor.py b/stackalytics/processor/user_processor.py index 97a4b66d0..0110d3b5f 100644 --- a/stackalytics/processor/user_processor.py +++ b/stackalytics/processor/user_processor.py @@ -13,11 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +from stackalytics.openstack.common import log as logging -def make_user_id(email=None, launchpad_id=None, gerrit_id=None, + +LOG = logging.getLogger(__name__) + + +def make_user_id(emails=None, launchpad_id=None, gerrit_id=None, member_id=None): - if launchpad_id or email: - return launchpad_id or email + if launchpad_id or emails: + return launchpad_id or emails[0] if gerrit_id: return 'gerrit:%s' % gerrit_id if member_id: @@ -25,9 +30,23 @@ def make_user_id(email=None, launchpad_id=None, gerrit_id=None, def store_user(runtime_storage_inst, user): + write_flag = False + if not user.get('seq'): user['seq'] = runtime_storage_inst.inc_user_count() - runtime_storage_inst.set_by_key('user:%s' % user['seq'], user) + LOG.debug('New user: %s', user) + write_flag = True + else: + stored_user = runtime_storage_inst.get_by_key( + 'user:%d' % user.get('seq')) + if stored_user != user: + LOG.debug('User updated: %s', user) + write_flag = True + + if not write_flag: + return + + runtime_storage_inst.set_by_key('user:%d' % user['seq'], user) if user.get('user_id'): runtime_storage_inst.set_by_key('user:%s' % user['user_id'], user) if user.get('launchpad_id'): @@ -53,4 +72,5 @@ def load_user(runtime_storage_inst, seq=None, user_id=None, email=None, def delete_user(runtime_storage_inst, user): + LOG.debug('Delete user: %s', user) runtime_storage_inst.delete_by_key('user:%s' % user['seq']) diff --git a/tests/unit/test_record_processor.py b/tests/unit/test_record_processor.py index 79cbfd776..ab9e6cfb8 100644 --- a/tests/unit/test_record_processor.py +++ b/tests/unit/test_record_processor.py @@ -139,7 +139,7 @@ class TestRecordProcessor(testtools.TestCase): author_name='John Doe')))[0] expected_commit = { - 'launchpad_id': 'john_doe', + 'user_id': 'john_doe', 'author_email': 'johndoe@gmail.com', 'author_name': 'John Doe', 'company_name': 'NEC', @@ -170,7 +170,7 @@ class TestRecordProcessor(testtools.TestCase): date=1000000000)))[0] expected_commit = { - 'launchpad_id': 'john_doe', + 'user_id': 'john_doe', 'author_email': 'johndoe@gmail.com', 'author_name': 'John Doe', 'company_name': '*independent', @@ -199,7 +199,7 @@ class TestRecordProcessor(testtools.TestCase): author_name='John Doe')))[0] expected_commit = { - 'launchpad_id': 'john_doe', + 'user_id': 'john_doe', 'author_email': 'johndoe@ibm.com', 'author_name': 'John Doe', 'company_name': 'IBM', @@ -232,7 +232,7 @@ class TestRecordProcessor(testtools.TestCase): author_name='John Doe')))[0] expected_commit = { - 'launchpad_id': 'john_doe', + 'user_id': 'john_doe', 'author_email': 'johndoe@ibm.com', 'author_name': 'John Doe', 'company_name': 'NEC', @@ -268,7 +268,7 @@ class TestRecordProcessor(testtools.TestCase): date=1000000000)))[0] expected_commit = { - 'launchpad_id': 'john_doe', + 'user_id': 'john_doe', 'author_email': 'johndoe@nec.com', 'author_name': 'John Doe', 'company_name': 'IBM', @@ -296,7 +296,7 @@ class TestRecordProcessor(testtools.TestCase): author_name='John Doe')))[0] expected_commit = { - 'launchpad_id': 'john_doe', + 'user_id': 'john_doe', 'author_email': 'johndoe@gmail.com', 'author_name': 'John Doe', 'company_name': 'NEC', @@ -326,7 +326,7 @@ class TestRecordProcessor(testtools.TestCase): author_name='John Doe')))[0] expected_commit = { - 'launchpad_id': 'john_doe', + 'user_id': 'john_doe', 'author_email': 'johndoe@ibm.com', 'author_name': 'John Doe', 'company_name': 'IBM', @@ -352,7 +352,7 @@ class TestRecordProcessor(testtools.TestCase): author_name='John Doe')))[0] expected_commit = { - 'launchpad_id': 'john_doe', + 'user_id': 'john_doe', 'author_email': 'johndoe@ibm.com', 'author_name': 'John Doe', 'company_name': 'IBM', @@ -566,14 +566,14 @@ class TestRecordProcessor(testtools.TestCase): self.assertRecordsMatch( {'record_type': 'bpd', - 'launchpad_id': 'john_doe', + 'user_id': 'john_doe', 'author_name': 'John Doe', 'company_name': '*independent'}, processed_records[0]) self.assertRecordsMatch( {'record_type': 'review', - 'launchpad_id': 'john_doe', + 'user_id': 'john_doe', 'author_name': 'John Doe', 'author_email': 'john_doe@gmail.com', 'company_name': '*independent'}, @@ -625,7 +625,7 @@ class TestRecordProcessor(testtools.TestCase): self.assertRecordsMatch( {'record_type': 'commit', - 'launchpad_id': 'john_doe', + 'user_id': 'john_doe', 'author_name': 'John Doe', 'author_email': 'john_doe@gmail.com', 'company_name': '*independent'}, @@ -667,7 +667,7 @@ class TestRecordProcessor(testtools.TestCase): self.assertRecordsMatch( {'record_type': 'review', - 'launchpad_id': 'john_doe', + 'user_id': 'john_doe', 'author_name': 'John Doe', 'author_email': 'john_doe@gmail.com', 'company_name': '*independent'}, @@ -675,7 +675,7 @@ class TestRecordProcessor(testtools.TestCase): self.assertRecordsMatch( {'record_type': 'bpd', - 'launchpad_id': 'john_doe', + 'user_id': 'john_doe', 'author_name': 'John Doe', 'company_name': '*independent'}, processed_records[1]) @@ -1021,12 +1021,12 @@ class TestRecordProcessor(testtools.TestCase): self.assertEqual(3, len(processed_commits)) self.assertRecordsMatch({ - 'launchpad_id': 'tupac', + 'user_id': 'tupac', 'author_email': 'tupac.shakur@openstack.com', 'author_name': 'Tupac Shakur', }, processed_commits[0]) self.assertRecordsMatch({ - 'launchpad_id': 'jimi', + 'user_id': 'jimi', 'author_email': 'jimi.hendrix@openstack.com', 'author_name': 'Jimi Hendrix', }, processed_commits[2]) @@ -1320,7 +1320,7 @@ class TestRecordProcessor(testtools.TestCase): ))[0] expected_commit = { - 'launchpad_id': 'john_doe', + 'user_id': 'john_doe', 'author_email': 'johndoe@gmail.com', 'author_name': 'John Doe', 'company_name': 'NEC', @@ -1353,7 +1353,7 @@ class TestRecordProcessor(testtools.TestCase): ))[0] expected_commit = { - 'launchpad_id': 'john_doe', + 'user_id': 'john_doe', 'author_email': 'johndoe@gmail.com', 'author_name': 'John Doe', 'company_name': 'NEC', @@ -1387,7 +1387,7 @@ class TestRecordProcessor(testtools.TestCase): ))[0] expected_commit = { - 'launchpad_id': 'john_doe', + 'user_id': 'john_doe', 'author_email': 'johndoe@gmail.com', 'author_name': 'John Doe', 'company_name': 'NEC', @@ -1421,7 +1421,7 @@ class TestRecordProcessor(testtools.TestCase): ))[0] expected_commit = { - 'launchpad_id': 'john_doe', + 'user_id': 'john_doe', 'author_email': 'johndoe@gmail.com', 'author_name': 'John Doe', 'company_name': 'NEC', @@ -1454,7 +1454,7 @@ class TestRecordProcessor(testtools.TestCase): ))[0] expected_commit = { - 'launchpad_id': 'john_doe', + 'user_id': 'john_doe', 'author_email': 'johndoe@gmail.com', 'author_name': 'John Doe', 'company_name': 'NEC', diff --git a/tools/check_user_profiles.py b/tools/check_user_profiles.py new file mode 100644 index 000000000..15adc0e71 --- /dev/null +++ b/tools/check_user_profiles.py @@ -0,0 +1,51 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import memcache + + +def check(expected, actual): + if expected != actual: + print ('Expected: %s\nActual: %s' % (expected, actual)) + + +def main(): + m = memcache.Client(['localhost:11211']) + count = m.get('user:count') + 1 + users = [m.get('user:%d' % seq) for seq in range(count)] + users = [u for u in users if u] + + for u in users: + user_id = u.get('user_id') + lp = u.get('launchpad_id') + g = u.get('gerrit_id') + emails = u.get('emails') + + if user_id: + check(u, m.get('user:%s' % user_id.encode('utf8'))) + + if lp: + check(u, m.get('user:%s' % lp.encode('utf8'))) + + if g: + check(u, m.get('user:gerrit:%s' % g.encode('utf8'))) + + if emails: + for e in emails: + check(u, m.get('user:%s' % e.encode('utf8'))) + + +if __name__ == '__main__': + main()