diff --git a/stackalytics/processor/main.py b/stackalytics/processor/main.py index af725e728..da2a2de28 100644 --- a/stackalytics/processor/main.py +++ b/stackalytics/processor/main.py @@ -20,7 +20,6 @@ from oslo_config import cfg from oslo_log import log as logging import psutil import six -import time from stackalytics.processor import bps from stackalytics.processor import config @@ -79,46 +78,99 @@ def _record_typer(record_iterator, record_type): yield record -def _process_repo(repo, runtime_storage_inst, record_processor_inst, - rcs_inst): - uri = repo['uri'] - quoted_uri = six.moves.urllib.parse.quote_plus(uri) - LOG.info('Processing repo uri: %s', uri) +def _get_repo_branches(repo): + return ({repo.get('default_branch', 'master')} | + set(r['branch'] for r in repo.get('releases', []) + if 'branch' in r)) + + +def _process_repo_blueprints(repo, runtime_storage_inst, + record_processor_inst): + LOG.info('Processing blueprints for repo: %s', repo['uri']) - LOG.info('Processing blueprints for repo uri: %s', uri) bp_iterator = lp.log(repo) bp_iterator_typed = _record_typer(bp_iterator, 'bp') - processed_bp_iterator = record_processor_inst.process( - bp_iterator_typed) + processed_bp_iterator = record_processor_inst.process(bp_iterator_typed) + runtime_storage_inst.set_records(processed_bp_iterator, utils.merge_records) - LOG.info('Processing bugs for repo uri: %s', uri) + +def _process_repo_bugs(repo, runtime_storage_inst, record_processor_inst): + LOG.info('Processing bugs for repo: %s', repo['uri']) + current_date = utils.date_to_timestamp('now') bug_modified_since = runtime_storage_inst.get_by_key( 'bug_modified_since-%s' % repo['module']) bug_iterator = bps.log(repo, bug_modified_since) bug_iterator_typed = _record_typer(bug_iterator, 'bug') - processed_bug_iterator = record_processor_inst.process( - bug_iterator_typed) + processed_bug_iterator = record_processor_inst.process(bug_iterator_typed) + runtime_storage_inst.set_records(processed_bug_iterator, utils.merge_records) + runtime_storage_inst.set_by_key('bug_modified_since-%s' % repo['module'], + current_date) - runtime_storage_inst.set_by_key( - 'bug_modified_since-%s' % repo['module'], current_date) +def _process_repo_reviews(repo, runtime_storage_inst, record_processor_inst, + rcs_inst): + for branch in _get_repo_branches(repo): + LOG.info('Processing reviews for repo: %s, branch: %s', + repo['uri'], branch) + + quoted_uri = six.moves.urllib.parse.quote_plus(repo['uri']) + rcs_key = 'rcs:%s:%s' % (quoted_uri, branch) + last_retrieval_time = runtime_storage_inst.get_by_key(rcs_key) + current_retrieval_time = utils.date_to_timestamp('now') + + review_iterator = itertools.chain( + rcs_inst.log(repo, branch, last_retrieval_time, status='open'), + rcs_inst.log(repo, branch, last_retrieval_time, status='merged'), + rcs_inst.log(repo, branch, last_retrieval_time, status='abandoned', + grab_comments=True), ) + + review_iterator_typed = _record_typer(review_iterator, 'review') + processed_review_iterator = record_processor_inst.process( + review_iterator_typed) + + runtime_storage_inst.set_records(processed_review_iterator, + utils.merge_records) + runtime_storage_inst.set_by_key(rcs_key, current_retrieval_time) + + +def _process_repo_ci_votes(repo, runtime_storage_inst, record_processor_inst, + rcs_inst): + for branch in _get_repo_branches(repo): + LOG.info('Processing CI votes for repo: %s, branch: %s', + repo['uri'], branch) + + quoted_uri = six.moves.urllib.parse.quote_plus(repo['uri']) + rcs_key = 'ci:%s:%s' % (quoted_uri, branch) + last_retrieval_time = runtime_storage_inst.get_by_key(rcs_key) + current_retrieval_time = utils.date_to_timestamp('now') + + review_iterator = rcs_inst.log(repo, branch, last_retrieval_time, + status='merged', grab_comments=True) + review_iterator = driverlog.log(review_iterator, repo['drivers']) + review_iterator_typed = _record_typer(review_iterator, 'ci') + processed_review_iterator = record_processor_inst.process( + review_iterator_typed) + + runtime_storage_inst.set_records(processed_review_iterator, + utils.merge_records) + runtime_storage_inst.set_by_key(rcs_key, current_retrieval_time) + + +def _process_repo_vcs(repo, runtime_storage_inst, record_processor_inst): vcs_inst = vcs.get_vcs(repo, CONF.sources_root) vcs_inst.fetch() - branches = {repo.get('default_branch', 'master')} - for release in repo.get('releases', []): - if 'branch' in release: - branches.add(release['branch']) - - for branch in branches: - LOG.info('Processing commits in repo: %s, branch: %s', uri, branch) + for branch in _get_repo_branches(repo): + LOG.info('Processing commits in repo: %s, branch: %s', + repo['uri'], branch) + quoted_uri = six.moves.urllib.parse.quote_plus(repo['uri']) vcs_key = 'vcs:%s:%s' % (quoted_uri, branch) last_id = runtime_storage_inst.get_by_key(vcs_key) @@ -132,49 +184,24 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst, last_id = vcs_inst.get_last_id(branch) runtime_storage_inst.set_by_key(vcs_key, last_id) - if 'has_gerrit' not in repo: - continue # do not poll reviews for those that do not have them - LOG.info('Processing reviews for repo: %s, branch: %s', uri, branch) +def _process_repo(repo, runtime_storage_inst, record_processor_inst, + rcs_inst): + LOG.info('Processing repo: %s', repo['uri']) - rcs_key = 'rcs:%s:%s' % (quoted_uri, branch) - last_retrieval_time = runtime_storage_inst.get_by_key(rcs_key) - current_retrieval_time = int(time.time()) + _process_repo_vcs(repo, runtime_storage_inst, record_processor_inst) - review_iterator = itertools.chain( - rcs_inst.log(repo, branch, last_retrieval_time, status='open'), - rcs_inst.log(repo, branch, last_retrieval_time, status='merged'), - rcs_inst.log(repo, branch, last_retrieval_time, status='abandoned', - grab_comments=True), - ) - review_iterator_typed = _record_typer(review_iterator, 'review') + _process_repo_bugs(repo, runtime_storage_inst, record_processor_inst) - processed_review_iterator = record_processor_inst.process( - review_iterator_typed) - runtime_storage_inst.set_records(processed_review_iterator, - utils.merge_records) + _process_repo_blueprints(repo, runtime_storage_inst, record_processor_inst) - runtime_storage_inst.set_by_key(rcs_key, current_retrieval_time) + if 'has_gerrit' in repo: + _process_repo_reviews(repo, runtime_storage_inst, + record_processor_inst, rcs_inst) if 'drivers' in repo: - LOG.info('Processing CI votes for repo: %s, branch: %s', - uri, branch) - - rcs_key = 'ci:%s:%s' % (quoted_uri, branch) - last_retrieval_time = runtime_storage_inst.get_by_key(rcs_key) - current_retrieval_time = int(time.time()) - - review_iterator = rcs_inst.log(repo, branch, last_retrieval_time, - status='merged', grab_comments=True) - review_iterator = driverlog.log(review_iterator, repo['drivers']) - review_iterator_typed = _record_typer(review_iterator, 'ci') - - processed_review_iterator = record_processor_inst.process( - review_iterator_typed) - runtime_storage_inst.set_records(processed_review_iterator, - utils.merge_records) - - runtime_storage_inst.set_by_key(rcs_key, current_retrieval_time) + _process_repo_ci_votes(repo, runtime_storage_inst, + record_processor_inst, rcs_inst) def _process_mail_list(uri, runtime_storage_inst, record_processor_inst):