Merge "zanata_stats: refactor"

This commit is contained in:
Jenkins 2017-03-18 10:42:52 +00:00 committed by Gerrit Code Review
commit cfd166165c

View File

@ -20,7 +20,6 @@ import csv
import datetime import datetime
import io import io
import json import json
import operator
import random import random
import re import re
import sys import sys
@ -38,6 +37,7 @@ ZANATA_VERSION_PATTERN = re.compile(r'^(master[-,a-z]*|stable-[a-z]+)$')
class ZanataUtility(object): class ZanataUtility(object):
"""Utilities to invoke Zanata REST API.""" """Utilities to invoke Zanata REST API."""
user_agents = [ user_agents = [
'Mozilla/5.0 (X11; Ubuntu; Linux x86_64) Gecko/20100101 Firefox/32.0', 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64) Gecko/20100101 Firefox/32.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_6) AppleWebKit/537.78.2', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_6) AppleWebKit/537.78.2',
@ -55,89 +55,181 @@ class ZanataUtility(object):
print('exception happen', e) print('exception happen', e)
LOG.warning('Error "%(error)s" while reading uri %(uri)s', LOG.warning('Error "%(error)s" while reading uri %(uri)s',
{'error': e, 'uri': uri}) {'error': e, 'uri': uri})
raise
def read_json_from_uri(self, uri): def read_json_from_uri(self, uri):
data = self.read_uri(uri, {'Accept': 'application/json'})
try: try:
data = self.read_uri(uri, {'Accept': 'application/json'})
return json.loads(data) return json.loads(data)
except Exception as e: except Exception as e:
LOG.warning('Error "%(error)s" parsing json from uri %(uri)s', LOG.warning('Error "%(error)s" parsing json from uri %(uri)s',
{'error': e, 'uri': uri}) {'error': e, 'uri': uri})
raise
def zanata_get_projects(self): def get_projects(self):
uri = ZANATA_URI % ('projects') uri = ZANATA_URI % ('projects')
LOG.debug("Reading projects from %s" % uri) LOG.debug("Reading projects from %s" % uri)
projects_data = self.read_json_from_uri(uri) projects_data = self.read_json_from_uri(uri)
for project in projects_data: return [project['id'] for project in projects_data]
yield project['id']
def _is_valid_version(self, version): @staticmethod
def _is_valid_version(version):
return bool(ZANATA_VERSION_PATTERN.match(version)) return bool(ZANATA_VERSION_PATTERN.match(version))
def zanata_get_project_versions(self, project_id): def get_project_versions(self, project_id):
uri = ZANATA_URI % ('projects/p/%s' % project_id) uri = ZANATA_URI % ('projects/p/%s' % project_id)
LOG.debug("Reading iterations for project %s" % project_id) LOG.debug("Reading iterations for project %s" % project_id)
project_data = self.read_json_from_uri(uri) project_data = self.read_json_from_uri(uri)
if ('iterations' in project_data): if 'iterations' in project_data:
for interation_data in project_data['iterations']: return [interation_data['id']
if self._is_valid_version(interation_data['id']): for interation_data in project_data['iterations']
yield interation_data['id'] if self._is_valid_version(interation_data['id'])]
else: else:
yield None return []
def zanata_get_user_stats(self, project_id, iteration_id, zanata_user_id, def get_user_stats(self, project_id, iteration_id, zanata_user_id,
start_date, end_date): start_date, end_date):
uri = ZANATA_URI % ('stats/project/%s/version/%s/contributor/%s/%s..%s' uri = ZANATA_URI % ('stats/project/%s/version/%s/contributor/%s/%s..%s'
% (project_id, iteration_id, zanata_user_id, % (project_id, iteration_id, zanata_user_id,
start_date, end_date)) start_date, end_date))
return self.read_json_from_uri(uri) return self.read_json_from_uri(uri)
def _make_language_team(name, team_info): class LanguageTeam(object):
return {
'tag': 'language_team', def __init__(self, language_code, team_info):
'language_code': name, self.language_code = language_code
'language': team_info['language'], self.language = team_info['language']
# Zanata ID which only consists of numbers is a valid ID # Zanata ID which only consists of numbers is a valid ID in Zanata.
# and such entry is interpreted as integer unless it is # Such entry is interpreted as integer unless it is quoted
# quoted in the YAML file. Ensure to stringify them. # in the YAML file. Ensure to stringify them.
'translators': [str(i) for i in team_info['translators']], self.translators = [str(i) for i in team_info['translators']]
'reviewers': [str(i) for i in team_info.get('reviewers', [])], self.reviewers = [str(i) for i in team_info.get('reviewers', [])]
'coordinators': [str(i) for i in team_info.get('coordinators', [])], self.coordinators = [str(i) for i in team_info.get('coordinators', [])]
}
@classmethod
def load_from_language_team_yaml(cls, translation_team_uri, lang_list):
LOG.debug('Process list of language team from uri: %s',
translation_team_uri)
content = yaml.safe_load(io.open(translation_team_uri, 'r'))
if lang_list:
lang_notfound = [lang_code for lang_code in lang_list
if lang_code not in content]
if lang_notfound:
print('Language %s not tound in %s.' %
(', '.join(lang_notfound),
translation_team_uri))
sys.exit(1)
return [cls(lang_code, team_info)
for lang_code, team_info in content.items()
if not lang_list or lang_code in lang_list]
def _make_user(user_id, language_code): class User(object):
return {
'user_id': user_id,
'lang': language_code,
'translation-stats': {},
'review-stats': {},
}
def __init__(self, user_id, language_code):
self.user_id = user_id
self.lang = language_code
self.translation_stats = {}
self.review_stats = {}
def read_language_team_yaml(translation_team_uri, lang_list): def __str__(self):
LOG.debug('Process list of language team from uri: %s', return ("<%s: user_id=%s, lang=%s, "
translation_team_uri) "translation_stats=%s, review_stats=%s" %
(self.__class__.__name__,
self.user_id, self.lang,
self.translation_stats,
self.review_stats))
content = yaml.safe_load(io.open(translation_team_uri, 'r')) def __repr__(self):
language_teams = {} return repr(self.convert_to_serializable_data())
if lang_list: def __lt__(self, other):
lang_notfound = [lang_code for lang_code in lang_list if self.lang != other.lang:
if lang_code not in content] return self.lang < other.lang
if lang_notfound: else:
print('Language %s not tound in %s.' % return self.user_id < other.user_id
(', '.join(lang_notfound),
translation_team_uri))
sys.exit(1)
for lang_code, team_info in content.items(): def read_from_zanata_stats(self, zanata_stats):
if lang_list and lang_code not in lang_list: # data format (Zanata 3.9.6)
continue # {
language_teams[lang_code] = _make_language_team(lang_code, team_info) # "username": "amotoki",
# "contributions": [
# {
# "locale": "ja",
# "translation-stats": {
# "translated": 7360,
# "needReview": 0,
# "approved": 152,
# "rejected": 0
# },
# "review-stats": {
# "approved": 220,
# "rejected": 0
# }
# }
# ]
# }
stats = [d for d in zanata_stats['contributions']
if d['locale'] == self.lang]
if not stats:
return
return language_teams stats = stats[0]
trans_stats = stats.get('translation-stats')
if trans_stats:
trans_stats['total'] = sum(trans_stats.values())
self.translation_stats = trans_stats
review_stats = stats.get('review-stats')
if review_stats:
review_stats['total'] = sum(review_stats.values())
self.review_stats = review_stats
def needs_output(self, include_no_activities):
if include_no_activities:
return True
elif self.translation_stats or self.review_stats:
return True
else:
return False
@staticmethod
def get_flattened_data_title():
return [
'user_id',
'lang',
'translation-total',
'translated',
'needReview',
'approved',
'rejected',
'review-total',
'review-approved',
'review-rejected'
]
def convert_to_flattened_data(self):
return [
self.user_id,
self.lang,
self.translation_stats.get('total', 0),
self.translation_stats.get('translated', 0),
self.translation_stats.get('needReview', 0),
self.translation_stats.get('approved', 0),
self.translation_stats.get('rejected', 0),
self.review_stats.get('total', 0),
self.review_stats.get('approved', 0),
self.review_stats.get('rejected', 0),
]
def convert_to_serializable_data(self):
return {'user_id': self.user_id,
'lang': self.lang,
'translation-stats': self.translation_stats,
'review-stats': self.review_stats}
def get_zanata_stats(start_date, end_date, language_teams, project_list, def get_zanata_stats(start_date, end_date, language_teams, project_list,
@ -145,78 +237,39 @@ def get_zanata_stats(start_date, end_date, language_teams, project_list,
print('Getting Zanata contributors statistics (from %s to %s) ...' % print('Getting Zanata contributors statistics (from %s to %s) ...' %
(start_date, end_date)) (start_date, end_date))
zanataUtil = ZanataUtility() zanataUtil = ZanataUtility()
users = {} users = []
for language_code in language_teams: for team in language_teams:
language_team = language_teams[language_code] users += [User(user_id, team.language_code)
for user in language_team['translators']: for user_id in team.translators]
users[user] = _make_user(user, language_code)
if not project_list: if not project_list:
project_list = zanataUtil.zanata_get_projects() project_list = zanataUtil.get_projects()
for project_id in project_list: for project_id in project_list:
for version in zanataUtil.zanata_get_project_versions(project_id): for version in zanataUtil.get_project_versions(project_id):
if version_list and version not in version_list: if version_list and version not in version_list:
continue continue
for user_id in users: for user in users:
if user_list and user_id not in user_list: if user_list and user.user_id not in user_list:
continue continue
user = users.get(user_id)
print('Getting %(project_id)s %(version)s ' print('Getting %(project_id)s %(version)s '
'for user %(user_id)s %(user_lang)s' 'for user %(user_id)s %(user_lang)s'
% {'project_id': project_id, % {'project_id': project_id,
'version': version, 'version': version,
'user_id': user_id, 'user_id': user.user_id,
'user_lang': user['lang']}) 'user_lang': user.lang})
statisticdata = zanataUtil.zanata_get_user_stats( statisticdata = zanataUtil.get_user_stats(
project_id, version, user_id, start_date, end_date) project_id, version, user.user_id, start_date, end_date)
print('Got: %s' % statisticdata) print('Got: %s' % statisticdata)
if statisticdata: user.read_from_zanata_stats(statisticdata)
user_stats = parse_user_stat(statisticdata, user)
if user_stats:
user.update(user_stats)
print('=> %s' % user) print('=> %s' % user)
return users return users
def parse_user_stat(stats, user):
# data format (Zanata 3.9.6)
# {
# "username": "amotoki",
# "contributions": [
# {
# "locale": "ja",
# "translation-stats": {
# "translated": 7360,
# "needReview": 0,
# "approved": 152,
# "rejected": 0
# },
# "review-stats": {
# "approved": 220,
# "rejected": 0
# }
# }
# ]
# }
stat = [d for d in stats['contributions']
if d['locale'] == user['lang']]
if stat:
stat = stat[0]
if 'translation-stats' in stat:
stat['translation-stats']['total'] = \
sum(stat['translation-stats'].values())
if 'review-stats' in stat:
stat['review-stats']['total'] = \
sum(stat['review-stats'].values())
return stat
def write_stats_to_file(users, output_file, file_format, def write_stats_to_file(users, output_file, file_format,
include_no_activities): include_no_activities):
stats = [user for user in stats = sorted([user for user in users
sorted(users.values(), key=operator.itemgetter('lang', 'user_id')) if user.needs_output(include_no_activities)])
if _needs_output(include_no_activities, user)]
if file_format == 'csv': if file_format == 'csv':
_write_stats_to_csvfile(stats, output_file) _write_stats_to_csvfile(stats, output_file)
else: else:
@ -224,46 +277,19 @@ def write_stats_to_file(users, output_file, file_format,
print('Stats has been written to %s' % output_file) print('Stats has been written to %s' % output_file)
def _needs_output(include_no_activities, user):
if include_no_activities:
return True
elif user['translation-stats'] or user['review-stats']:
return True
else:
return False
def _write_stats_to_csvfile(stats, output_file): def _write_stats_to_csvfile(stats, output_file):
mode = 'w' if six.PY3 else 'wb' mode = 'w' if six.PY3 else 'wb'
with open(output_file, mode) as csvfile: with open(output_file, mode) as csvfile:
writer = csv.writer(csvfile) writer = csv.writer(csvfile)
writer.writerow(['user_id', writer.writerow(User.get_flattened_data_title())
'lang',
'translation-total',
'translated',
'needReview',
'approved',
'rejected',
'review-total',
'review-approved',
'review-rejected'])
for stat in stats: for stat in stats:
writer.writerow([stat['user_id'], writer.writerow(stat.convert_to_flattened_data())
stat['lang'],
stat['translation-stats'].get('total', 0),
stat['translation-stats'].get('translated', 0),
stat['translation-stats'].get('needReview', 0),
stat['translation-stats'].get('approved', 0),
stat['translation-stats'].get('rejected', 0),
stat['review-stats'].get('total', 0),
stat['review-stats'].get('approved', 0),
stat['review-stats'].get('rejected', 0),
])
def _write_stats_to_jsonfile(stats, output_file): def _write_stats_to_jsonfile(stats, output_file):
stats = [stat.convert_to_serializable_data() for stat in stats]
with open(output_file, 'w') as f: with open(output_file, 'w') as f:
f.write(json.dumps(stats, indent=4)) f.write(json.dumps(stats, indent=4, sort_keys=True))
def _comma_separated_list(s): def _comma_separated_list(s):
@ -320,7 +346,8 @@ def main():
help="YAML file of the user list") help="YAML file of the user list")
options = parser.parse_args() options = parser.parse_args()
language_teams = read_language_team_yaml(options.user_yaml, options.lang) language_teams = LanguageTeam.load_from_language_team_yaml(
options.user_yaml, options.lang)
versions = [v.replace('/', '-') for v in options.target_version or []] versions = [v.replace('/', '-') for v in options.target_version or []]
users = get_zanata_stats(options.start_date, options.end_date, users = get_zanata_stats(options.start_date, options.end_date,