stackalytics/stackalytics/processor/record_processor.py

817 lines
32 KiB
Python

# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import bisect
import collections
import copy
import functools
import time
from oslo_config import cfg
from oslo_log import log as logging
import six
from stackalytics.processor import launchpad_utils
from stackalytics.processor import user_processor
from stackalytics.processor import utils
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
MERGE_MESSAGE = "Change has been successfully merged"
class RecordProcessor(object):
def __init__(self, runtime_storage_inst):
self.runtime_storage_inst = runtime_storage_inst
self.domains_index = runtime_storage_inst.get_by_key('companies')
self.releases = runtime_storage_inst.get_by_key('releases')
self.releases_dates = [r['end_date'] for r in self.releases]
self.modules = None
self.alias_module_map = None
def _get_release(self, timestamp):
release_index = bisect.bisect(self.releases_dates, timestamp)
if release_index >= len(self.releases):
LOG.warning('Timestamp %s is beyond releases boundaries, the last '
'release will be used. Please consider adding a '
'new release into default_data.json', timestamp)
release_index = len(self.releases) - 1
return self.releases[release_index]['release_name']
def _get_modules(self):
if self.modules is None:
self.modules = set()
self.alias_module_map = dict()
for repo in utils.load_repos(self.runtime_storage_inst):
module = repo['module'].lower()
module_aliases = repo.get('aliases') or []
add = True
for module_name in ([module] + module_aliases):
for m in self.modules:
if module_name.find(m) >= 0:
add = False
break
if m.find(module_name) >= 0:
self.modules.remove(m)
break
if add:
self.modules.add(module_name)
for alias in module_aliases:
self.alias_module_map[alias] = module
return self.modules, self.alias_module_map
def _need_to_fetch_launchpad(self):
return CONF.fetching_user_source == 'launchpad'
def _update_user(self, record):
email = record.get('author_email')
user_e = user_processor.load_user(
self.runtime_storage_inst, email=email) or {}
user_name = record.get('author_name')
launchpad_id = record.get('launchpad_id')
if (self._need_to_fetch_launchpad() and email and (not user_e) and
(not launchpad_id) and (not user_e.get('launchpad_id'))):
# query LP
launchpad_id, lp_user_name = launchpad_utils.query_lp_info(email)
if lp_user_name:
user_name = lp_user_name
gerrit_id = record.get('gerrit_id')
gerrit_tuple = None
if gerrit_id:
gerrit_tuple = (record['gerrit_hostname'], gerrit_id)
user_g = user_processor.load_user(
self.runtime_storage_inst, gerrit_tuple=gerrit_tuple) or {}
if (self._need_to_fetch_launchpad() and (not user_g) and
(not launchpad_id) and (not user_e.get('launchpad_id'))):
# query LP
guessed_lp_id = gerrit_id
lp_user_name = launchpad_utils.query_lp_user_name(
guessed_lp_id)
if lp_user_name == user_name:
launchpad_id = guessed_lp_id
else:
user_g = {}
zanata_id = record.get('zanata_id')
if zanata_id:
user_z = user_processor.load_user(
self.runtime_storage_inst, zanata_id=zanata_id) or {}
if (self._need_to_fetch_launchpad() and (not user_z) and
(not launchpad_id) and (not user_e.get('launchpad_id'))):
# query LP
guessed_lp_id = zanata_id
user_name = launchpad_utils.query_lp_user_name(guessed_lp_id)
if user_name != guessed_lp_id:
launchpad_id = guessed_lp_id
else:
user_z = {}
user_l = user_processor.load_user(
self.runtime_storage_inst, launchpad_id=launchpad_id) or {}
if user_processor.are_users_same([user_e, user_l, user_g, user_z]):
# If sequence numbers are set and the same, merge is not needed
return user_e
user = user_processor.create_user(
self.domains_index, launchpad_id, email, gerrit_tuple, zanata_id,
user_name)
if user_e or user_l or user_g or user_z:
# merge between existing profiles and a new one
user, users_to_delete = user_processor.merge_user_profiles(
self.domains_index, [user_e, user_l, user_g, user_z, user])
# delete all unneeded profiles
user_processor.delete_users(
self.runtime_storage_inst, users_to_delete)
else:
# create new profile
if (self._need_to_fetch_launchpad() and not user_name):
user_name = launchpad_utils.query_lp_user_name(launchpad_id)
if user_name:
user['user_name'] = user_name
LOG.debug('Created new user: %s', user)
user_processor.store_user(self.runtime_storage_inst, user)
LOG.debug('Stored user: %s', user)
return user
def _update_record_and_user(self, record):
user = self._update_user(record)
record['user_id'] = user['user_id']
if user.get('user_name'):
record['author_name'] = user['user_name']
company, policy = user_processor.get_company_for_date(
user['companies'], record['date'])
if not user.get('static'):
# for auto-generated profiles affiliation may be overridden
if company != '*robots' and policy == 'open':
company = (user_processor.get_company_by_email(
self.domains_index, record.get('author_email')) or company)
record['company_name'] = company
def _process_commit(self, record):
record['primary_key'] = record['commit_id']
record['loc'] = record['lines_added'] + record['lines_deleted']
record['author_email'] = record['author_email'].lower()
record['commit_date'] = record['date']
coauthors = record.get('coauthor')
if not coauthors:
self._update_record_and_user(record)
if record['company_name'] != '*robots':
yield record
else:
if record['author_email'] not in [
c['author_email'] for c in coauthors]:
coauthors.append({'author_name': record['author_name'],
'author_email': record['author_email']})
for coauthor in coauthors:
coauthor['date'] = record['date']
self._update_record_and_user(coauthor)
for coauthor in coauthors:
new_record = copy.deepcopy(record)
new_record.update(coauthor)
new_record['primary_key'] += coauthor['author_email']
yield new_record
def _make_review_record(self, record):
# copy everything except patchsets and flatten user data
review = dict([(k, v) for k, v in six.iteritems(record)
if k not in ['patchSets', 'owner', 'createdOn',
'comments']])
owner = record.get('owner', {})
review['primary_key'] = review['id']
if owner.get('username'):
review['gerrit_id'] = owner['username']
review['author_name'] = (owner.get('name') or owner.get('username')
or 'Anonymous Coward') # do it like gerrit
if owner.get('email'):
review['author_email'] = owner['email'].lower()
review['date'] = record['createdOn']
patch_sets = record.get('patchSets', [])
review['updated_on'] = review['date']
if patch_sets:
patch = patch_sets[-1]
if 'approvals' in patch:
review['value'] = min([int(p['value'])
for p in patch['approvals']])
review['updated_on'] = patch['approvals'][0]['grantedOn']
else:
review['updated_on'] = patch['createdOn']
if record.get('status') == 'MERGED':
for comment in reversed(record.get('comments') or []):
if MERGE_MESSAGE in comment['message']:
review['merge_date'] = comment['timestamp']
break
if 'value' not in review:
review['value'] = 0
if record.get('status') == 'MERGED' and 'merge_date' not in review:
review['merge_date'] = review['lastUpdated']
self._update_record_and_user(review)
return review
def _make_patch_record(self, review, patch):
patch_record = dict()
patch_record['record_type'] = 'patch'
patch_record['primary_key'] = utils.get_patch_id(
review['id'], patch['number'])
patch_record['number'] = patch['number']
patch_record['date'] = patch['createdOn']
uploader = patch['uploader']
if uploader.get('username'):
patch_record['gerrit_id'] = uploader['username']
patch_record['author_name'] = (uploader.get('name')
or uploader.get('username')
or 'Anonymous Coward')
if uploader.get('email'):
patch_record['author_email'] = uploader['email'].lower()
patch_record['gerrit_hostname'] = review['gerrit_hostname']
patch_record['module'] = review['module']
patch_record['branch'] = review['branch']
patch_record['review_id'] = review['id']
self._update_record_and_user(patch_record)
return patch_record
def _make_mark_record(self, review, patch, approval):
# copy everything and flatten user data
mark = dict([(k, v) for k, v in six.iteritems(approval)
if k not in ['by', 'grantedOn', 'value', 'description']])
reviewer = approval['by']
mark['record_type'] = 'mark'
mark['value'] = int(approval['value'])
mark['date'] = approval['grantedOn']
mark['primary_key'] = (review['id'] + str(mark['date']) + mark['type'])
mark['gerrit_id'] = reviewer['username']
mark['author_name'] = reviewer.get('name') or reviewer.get('username')
mark['author_email'] = reviewer['email'].lower()
mark['module'] = review['module']
mark['branch'] = review['branch']
mark['review_id'] = review['id']
mark['patch'] = int(patch['number'])
mark['gerrit_hostname'] = review['gerrit_hostname']
if reviewer['username'] == patch.get('uploader', {}).get('username'):
# reviewer is the same as author of the patch
mark['type'] = 'Self-%s' % mark['type']
self._update_record_and_user(mark)
return mark
def _process_review(self, record):
"""Process a review.
Review spawns into records of three types:
* review - records that a user created review request
* patch - records that a user submitted another patch set
* mark - records that a user set approval mark to given review
"""
owner = record.get('owner', {})
if 'owner' not in record:
LOG.info('Record has no owner. %s' % record)
if 'email' in owner or 'username' in owner:
yield self._make_review_record(record)
for patch in record.get('patchSets', []):
if (('email' in patch.get('uploader', {})) or
('username' in patch.get('uploader', {}))):
yield self._make_patch_record(record, patch)
if 'approvals' not in patch:
continue # not reviewed by anyone
for approval in patch.get('approvals', []):
if approval.get('type') not in ('Code-Review', 'Workflow'):
continue # keep only Code-Review and Workflow
if ('email' not in approval.get('by', {}) or
'username' not in approval.get('by', {})):
continue # ignore
yield self._make_mark_record(record, patch, approval)
# check for abandon action
if record.get('status') == 'ABANDONED':
for comment in reversed(record.get('comments') or []):
if comment.get('message') == 'Abandoned':
action = dict(type='Abandon', value=0)
action['by'] = comment.get('reviewer')
action['grantedOn'] = comment.get('timestamp')
if ('email' not in action.get('by', {}) or
'username' not in action.get('by', {})):
continue # ignore
yield self._make_mark_record(
record, record['patchSets'][-1], action)
def _guess_module(self, record):
subject = record['subject'].lower()
pos = len(subject)
best_guess_module = None
modules, alias_module_map = self._get_modules()
for module in modules:
find = subject.find(module)
if (find >= 0) and (find < pos) \
and (len(module) > len(best_guess_module or '')):
pos = find
best_guess_module = module
if best_guess_module:
if (((pos > 0) and (subject[pos - 1] == '[')) or
(not record.get('module'))):
record['module'] = best_guess_module
if not record.get('module'):
record['module'] = 'unknown'
elif record['module'] in alias_module_map:
record['module'] = alias_module_map[record['module']]
def _process_email(self, record):
record['primary_key'] = record['message_id']
record['author_email'] = record['author_email'].lower()
self._update_record_and_user(record)
self._guess_module(record)
if not record.get('blueprint_id'):
del record['body']
elif len(record['body']) > 4000:
record['body'] = record['body'][:4000] + '...'
yield record
def _process_blueprint(self, record):
bpd_author = record.get('drafter') or record.get('owner')
bpd = dict([(k, v) for k, v in six.iteritems(record)
if k.find('_link') < 0])
bpd['record_type'] = 'bpd'
bpd['primary_key'] = 'bpd:' + record['id']
bpd['launchpad_id'] = bpd_author
bpd['date'] = record['date_created']
bpd['web_link'] = record.get('web_link')
self._update_record_and_user(bpd)
yield bpd
if (record.get('assignee') and record['date_completed'] and
record.get('implementation_status') == 'Implemented'):
bpc = dict([(k, v) for k, v in six.iteritems(record)
if k.find('_link') < 0])
bpc['record_type'] = 'bpc'
bpc['primary_key'] = 'bpc:' + record['id']
bpc['launchpad_id'] = record['assignee']
bpc['date'] = record['date_completed']
self._update_record_and_user(bpc)
yield bpc
def _process_bug(self, record):
bug_created = record.copy()
bug_created['primary_key'] = 'bugf:' + record['id']
bug_created['record_type'] = 'bugf'
bug_created['launchpad_id'] = record.get('owner')
bug_created['date'] = record['date_created']
self._update_record_and_user(bug_created)
yield bug_created
FIXED_BUGS = ['Fix Committed', 'Fix Released']
if (('date_fix_committed' in record or 'date_fix_released' in record)
and record['status'] in FIXED_BUGS):
bug_fixed = record.copy()
bug_fixed['primary_key'] = 'bugr:' + record['id']
bug_fixed['record_type'] = 'bugr'
bug_fixed['launchpad_id'] = record.get('assignee') or '*unassigned'
# It appears that launchpad automatically sets the
# date_fix_committed field when a bug moves from an open
# state to Fix Released, however it isn't clear that this
# is documented. So, we take the commit date if it is
# present or the release date if no commit date is
# present.
bug_fixed['date'] = (
record.get('date_fix_committed') or
record['date_fix_released']
)
self._update_record_and_user(bug_fixed)
yield bug_fixed
def _process_member(self, record):
user_id = user_processor.make_user_id(member_id=record['member_id'])
record['primary_key'] = user_id
record['date'] = utils.member_date_to_timestamp(record['date_joined'])
record['author_name'] = record['member_name']
record['module'] = 'unknown'
company_draft = record['company_draft']
company_name = self.domains_index.get(utils.normalize_company_name(
company_draft)) or (utils.normalize_company_draft(company_draft))
# author_email is a key to create new user
record['author_email'] = user_id
record['company_name'] = company_name
# _update_record_and_user function will create new user if needed
self._update_record_and_user(record)
record['company_name'] = company_name
user = user_processor.load_user(self.runtime_storage_inst,
user_id=user_id)
user['user_name'] = record['author_name']
user['companies'] = [{
'company_name': company_name,
'end_date': 0,
}]
user['company_name'] = company_name
user_processor.store_user(self.runtime_storage_inst, user)
record['company_name'] = company_name
yield record
def _process_translation(self, record):
# todo split translation and approval
translation = record.copy()
user_id = user_processor.make_user_id(zanata_id=record['zanata_id'])
translation['record_type'] = 'tr'
translation['primary_key'] = '%s:%s:%s:%s' % (
user_id, record['module'], record['date'], record['branch'])
translation['author_name'] = user_id
# following fields are put into standard fields stored in dashboard mem
translation['loc'] = record['translated']
translation['value'] = record['language']
self._update_record_and_user(translation)
yield translation
def _renew_record_date(self, record, force_update_release=False):
record['week'] = utils.timestamp_to_week(record['date'])
if (force_update_release or ('release' not in record) or
(not record['release'])):
record['release'] = self._get_release(record['date'])
def process(self, record_iterator):
PROCESSORS = {
'commit': self._process_commit,
'review': self._process_review,
'email': self._process_email,
'bp': self._process_blueprint,
'bug': self._process_bug,
'member': self._process_member,
'i18n': self._process_translation,
}
for record in record_iterator:
try:
for r in PROCESSORS[record['record_type']](record):
self._renew_record_date(r)
yield r
except KeyError as e:
LOG.error("Could not process record %s because "
"of key error %s" % (record, e))
def _update_records_with_releases(self, release_index):
LOG.info('Update records with releases')
def record_handler(record):
if (record['record_type'] == 'commit'
and record['primary_key'] in release_index):
release = release_index[record['primary_key']]
else:
release = self._get_release(record['date'])
if record['release'] != release:
record['release'] = release
yield record
yield record_handler
def _update_records_with_user_info(self):
LOG.info('Update user info in records')
def record_handler(record):
company_name = record['company_name']
user_id = record['user_id']
author_name = record['author_name']
self._update_record_and_user(record)
if ((record['company_name'] != company_name) or
(record['user_id'] != user_id) or
(record['author_name'] != author_name)):
LOG.debug('User info (%(id)s, %(name)s, %(company)s) has '
'changed in record %(record)s',
{'id': user_id, 'name': author_name,
'company': company_name, 'record': record})
yield record
yield record_handler
def _update_commits_with_merge_date(self, release_index):
LOG.info('Update commits with merge date')
change_id_to_date = {}
def record_handler_pass_1(record):
if (record['record_type'] == 'review' and
record.get('status') == 'MERGED'):
# keep compatibility with old records
change_id_to_date[record['id']] = (
record.get('merge_date') or record.get('last_updated'))
yield record_handler_pass_1
LOG.info('Update commits with merge date: pass 2')
def record_handler_pass_2(record):
if record['record_type'] == 'commit':
change_id_list = record.get('change_id')
if change_id_list and len(change_id_list) == 1:
change_id = change_id_list[0]
if change_id in change_id_to_date:
old_date = record['date']
if old_date != change_id_to_date[change_id]:
record['date'] = change_id_to_date[change_id]
force_update_release = (
record['primary_key'] not in release_index
)
self._renew_record_date(record,
force_update_release)
LOG.debug('Date %(date)s has changed in record '
'%(record)s', {'date': old_date,
'record': record})
yield record
yield record_handler_pass_2
def _update_blueprints_with_mention_info(self):
LOG.info('Process blueprints and calculate mention info')
valid_blueprints = {}
mentioned_blueprints = {}
def record_handler_pass_1(record):
for bp in record.get('blueprint_id', []):
if bp in mentioned_blueprints:
mentioned_blueprints[bp]['count'] += 1
if record['date'] > mentioned_blueprints[bp]['date']:
mentioned_blueprints[bp]['date'] = record['date']
else:
mentioned_blueprints[bp] = {
'count': 1,
'date': record['date']
}
if record['record_type'] in ['bpd', 'bpc']:
valid_blueprints[record['id']] = {
'primary_key': record['primary_key'],
'count': 0,
'date': record['date']
}
yield record_handler_pass_1
for bp_name, bp in six.iteritems(valid_blueprints):
if bp_name in mentioned_blueprints:
bp['count'] = mentioned_blueprints[bp_name]['count']
bp['date'] = mentioned_blueprints[bp_name]['date']
else:
bp['count'] = 0
bp['date'] = 0
LOG.info('Process blueprints and calculate mention info: pass 2')
def record_handler_pass_2(record):
need_update = False
valid_bp = set([])
for bp in record.get('blueprint_id', []):
if bp in valid_blueprints:
valid_bp.add(bp)
else:
LOG.debug('Update record %s: removed invalid bp: %s',
record['primary_key'], bp)
need_update = True
record['blueprint_id'] = list(valid_bp)
if record['record_type'] in ['bpd', 'bpc']:
bp = valid_blueprints[record['id']]
if ((record.get('mention_count') != bp['count']) or
(record.get('mention_date') != bp['date'])):
record['mention_count'] = bp['count']
record['mention_date'] = bp['date']
LOG.debug('Update record %s: mention stats: (%s:%s)',
record['primary_key'], bp['count'], bp['date'])
need_update = True
if need_update:
yield record
yield record_handler_pass_2
def _determine_core_contributors(self):
LOG.info('Determine core contributors')
module_branches = collections.defaultdict(set)
quarter_ago = int(time.time()) - 60 * 60 * 24 * 30 * 3 # a quarter ago
def record_handler(record):
if (record['record_type'] == 'mark' and
record['date'] > quarter_ago and
record['value'] in [2, -2]):
module_branch = (record['module'], record['branch'])
user_id = record['user_id']
module_branches[user_id].add(module_branch)
yield record_handler
for user in self.runtime_storage_inst.get_all_users():
core_old = user.get('core')
user_module_branch = module_branches.get(user['user_id'])
if user_module_branch:
user['core'] = list(user_module_branch)
elif user.get('core'):
del user['core']
if user.get('core') != core_old:
user_processor.store_user(self.runtime_storage_inst, user)
def _close_patch(self, cores, marks):
if len(marks) < 2:
return
core_mark = 0
for mark in sorted(marks, key=lambda x: x['date'], reverse=True):
if core_mark == 0:
if (mark['module'], mark['branch'], mark['user_id']) in cores:
# mark is from core engineer
core_mark = mark['value']
continue
disagreement = ((core_mark != 0) and
((core_mark < 0 < mark['value']) or
(core_mark > 0 > mark['value'])))
old_disagreement = mark.get('disagreement', False)
mark['disagreement'] = disagreement
if old_disagreement != disagreement:
yield mark
def _update_marks_with_disagreement(self):
yield lambda record: None
LOG.info('Process marks to find disagreements')
cores = set()
for user in self.runtime_storage_inst.get_all_users():
for (module, branch) in (user.get('core') or []):
cores.add((module, branch, user['user_id']))
# map from review_id to current patch and list of marks
marks_per_patch = collections.defaultdict(
lambda: {'patch_number': 0, 'marks': []})
def record_handler(record):
if (record['record_type'] == 'mark' and
record['type'] == 'Code-Review'):
review_id = record['review_id']
patch_number = record['patch']
if review_id in marks_per_patch:
# review is already seen, check if patch is newer
if (marks_per_patch[review_id]['patch_number'] <
patch_number):
# the patch is new, close the current
for processed in self._close_patch(
cores, marks_per_patch[review_id]['marks']):
yield processed
del marks_per_patch[review_id]
marks_per_patch[review_id]['patch_number'] = patch_number
marks_per_patch[review_id]['marks'].append(record)
yield record_handler
# purge the rest
for marks_patch in marks_per_patch.values():
self.runtime_storage_inst.set_records(
self._close_patch(cores, marks_patch['marks']))
def _update_members_company_name(self):
LOG.info('Update members with company names')
def record_handler(record):
if record['record_type'] != 'member':
return
company_draft = record['company_draft']
company_name = self.domains_index.get(
utils.normalize_company_name(company_draft)) or (
utils.normalize_company_draft(company_draft))
if company_name == record['company_name']:
return
LOG.debug('Update record %s, company name changed to %s',
record, company_name)
record['company_name'] = company_name
yield record
user = user_processor.load_user(self.runtime_storage_inst,
user_id=record['user_id'])
LOG.debug('Update user %s, company name changed to %s',
user, company_name)
user['companies'] = [{
'company_name': company_name,
'end_date': 0,
}]
user_processor.store_user(self.runtime_storage_inst, user)
yield record_handler
def _update_commits_with_module_alias(self):
LOG.info('Update record with aliases')
modules, alias_module_map = self._get_modules()
def record_handler(record):
if record['record_type'] != 'commit':
return
rec_module = record.get('module', None)
if rec_module and rec_module in alias_module_map:
record['module'] = alias_module_map[rec_module]
yield record
yield record_handler
def post_processing(self, release_index):
processors = [
self._update_records_with_user_info,
functools.partial(self._update_commits_with_merge_date,
release_index),
functools.partial(self._update_records_with_releases,
release_index),
self._update_commits_with_module_alias,
self._update_blueprints_with_mention_info,
self._determine_core_contributors,
self._update_members_company_name,
self._update_marks_with_disagreement,
]
pipeline_processor = utils.make_pipeline_processor(processors)
self.runtime_storage_inst.set_records(pipeline_processor(
self.runtime_storage_inst.get_all_records))