Merge "Refactor repo processing code"

This commit is contained in:
Jenkins
2017-05-11 11:03:25 +00:00
committed by Gerrit Code Review

View File

@@ -20,7 +20,6 @@ from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
import psutil import psutil
import six import six
import time
from stackalytics.processor import bps from stackalytics.processor import bps
from stackalytics.processor import config from stackalytics.processor import config
@@ -79,46 +78,99 @@ def _record_typer(record_iterator, record_type):
yield record yield record
def _process_repo(repo, runtime_storage_inst, record_processor_inst, def _get_repo_branches(repo):
rcs_inst): return ({repo.get('default_branch', 'master')} |
uri = repo['uri'] set(r['branch'] for r in repo.get('releases', [])
quoted_uri = six.moves.urllib.parse.quote_plus(uri) if 'branch' in r))
LOG.info('Processing repo uri: %s', uri)
def _process_repo_blueprints(repo, runtime_storage_inst,
record_processor_inst):
LOG.info('Processing blueprints for repo: %s', repo['uri'])
LOG.info('Processing blueprints for repo uri: %s', uri)
bp_iterator = lp.log(repo) bp_iterator = lp.log(repo)
bp_iterator_typed = _record_typer(bp_iterator, 'bp') bp_iterator_typed = _record_typer(bp_iterator, 'bp')
processed_bp_iterator = record_processor_inst.process( processed_bp_iterator = record_processor_inst.process(bp_iterator_typed)
bp_iterator_typed)
runtime_storage_inst.set_records(processed_bp_iterator, runtime_storage_inst.set_records(processed_bp_iterator,
utils.merge_records) utils.merge_records)
LOG.info('Processing bugs for repo uri: %s', uri)
def _process_repo_bugs(repo, runtime_storage_inst, record_processor_inst):
LOG.info('Processing bugs for repo: %s', repo['uri'])
current_date = utils.date_to_timestamp('now') current_date = utils.date_to_timestamp('now')
bug_modified_since = runtime_storage_inst.get_by_key( bug_modified_since = runtime_storage_inst.get_by_key(
'bug_modified_since-%s' % repo['module']) 'bug_modified_since-%s' % repo['module'])
bug_iterator = bps.log(repo, bug_modified_since) bug_iterator = bps.log(repo, bug_modified_since)
bug_iterator_typed = _record_typer(bug_iterator, 'bug') bug_iterator_typed = _record_typer(bug_iterator, 'bug')
processed_bug_iterator = record_processor_inst.process( processed_bug_iterator = record_processor_inst.process(bug_iterator_typed)
bug_iterator_typed)
runtime_storage_inst.set_records(processed_bug_iterator, runtime_storage_inst.set_records(processed_bug_iterator,
utils.merge_records) utils.merge_records)
runtime_storage_inst.set_by_key('bug_modified_since-%s' % repo['module'],
current_date)
runtime_storage_inst.set_by_key(
'bug_modified_since-%s' % repo['module'], current_date)
def _process_repo_reviews(repo, runtime_storage_inst, record_processor_inst,
rcs_inst):
for branch in _get_repo_branches(repo):
LOG.info('Processing reviews for repo: %s, branch: %s',
repo['uri'], branch)
quoted_uri = six.moves.urllib.parse.quote_plus(repo['uri'])
rcs_key = 'rcs:%s:%s' % (quoted_uri, branch)
last_retrieval_time = runtime_storage_inst.get_by_key(rcs_key)
current_retrieval_time = utils.date_to_timestamp('now')
review_iterator = itertools.chain(
rcs_inst.log(repo, branch, last_retrieval_time, status='open'),
rcs_inst.log(repo, branch, last_retrieval_time, status='merged'),
rcs_inst.log(repo, branch, last_retrieval_time, status='abandoned',
grab_comments=True), )
review_iterator_typed = _record_typer(review_iterator, 'review')
processed_review_iterator = record_processor_inst.process(
review_iterator_typed)
runtime_storage_inst.set_records(processed_review_iterator,
utils.merge_records)
runtime_storage_inst.set_by_key(rcs_key, current_retrieval_time)
def _process_repo_ci_votes(repo, runtime_storage_inst, record_processor_inst,
rcs_inst):
for branch in _get_repo_branches(repo):
LOG.info('Processing CI votes for repo: %s, branch: %s',
repo['uri'], branch)
quoted_uri = six.moves.urllib.parse.quote_plus(repo['uri'])
rcs_key = 'ci:%s:%s' % (quoted_uri, branch)
last_retrieval_time = runtime_storage_inst.get_by_key(rcs_key)
current_retrieval_time = utils.date_to_timestamp('now')
review_iterator = rcs_inst.log(repo, branch, last_retrieval_time,
status='merged', grab_comments=True)
review_iterator = driverlog.log(review_iterator, repo['drivers'])
review_iterator_typed = _record_typer(review_iterator, 'ci')
processed_review_iterator = record_processor_inst.process(
review_iterator_typed)
runtime_storage_inst.set_records(processed_review_iterator,
utils.merge_records)
runtime_storage_inst.set_by_key(rcs_key, current_retrieval_time)
def _process_repo_vcs(repo, runtime_storage_inst, record_processor_inst):
vcs_inst = vcs.get_vcs(repo, CONF.sources_root) vcs_inst = vcs.get_vcs(repo, CONF.sources_root)
vcs_inst.fetch() vcs_inst.fetch()
branches = {repo.get('default_branch', 'master')} for branch in _get_repo_branches(repo):
for release in repo.get('releases', []): LOG.info('Processing commits in repo: %s, branch: %s',
if 'branch' in release: repo['uri'], branch)
branches.add(release['branch'])
for branch in branches:
LOG.info('Processing commits in repo: %s, branch: %s', uri, branch)
quoted_uri = six.moves.urllib.parse.quote_plus(repo['uri'])
vcs_key = 'vcs:%s:%s' % (quoted_uri, branch) vcs_key = 'vcs:%s:%s' % (quoted_uri, branch)
last_id = runtime_storage_inst.get_by_key(vcs_key) last_id = runtime_storage_inst.get_by_key(vcs_key)
@@ -132,49 +184,24 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst,
last_id = vcs_inst.get_last_id(branch) last_id = vcs_inst.get_last_id(branch)
runtime_storage_inst.set_by_key(vcs_key, last_id) runtime_storage_inst.set_by_key(vcs_key, last_id)
if 'has_gerrit' not in repo:
continue # do not poll reviews for those that do not have them
LOG.info('Processing reviews for repo: %s, branch: %s', uri, branch) def _process_repo(repo, runtime_storage_inst, record_processor_inst,
rcs_inst):
LOG.info('Processing repo: %s', repo['uri'])
rcs_key = 'rcs:%s:%s' % (quoted_uri, branch) _process_repo_vcs(repo, runtime_storage_inst, record_processor_inst)
last_retrieval_time = runtime_storage_inst.get_by_key(rcs_key)
current_retrieval_time = int(time.time())
review_iterator = itertools.chain( _process_repo_bugs(repo, runtime_storage_inst, record_processor_inst)
rcs_inst.log(repo, branch, last_retrieval_time, status='open'),
rcs_inst.log(repo, branch, last_retrieval_time, status='merged'),
rcs_inst.log(repo, branch, last_retrieval_time, status='abandoned',
grab_comments=True),
)
review_iterator_typed = _record_typer(review_iterator, 'review')
processed_review_iterator = record_processor_inst.process( _process_repo_blueprints(repo, runtime_storage_inst, record_processor_inst)
review_iterator_typed)
runtime_storage_inst.set_records(processed_review_iterator,
utils.merge_records)
runtime_storage_inst.set_by_key(rcs_key, current_retrieval_time) if 'has_gerrit' in repo:
_process_repo_reviews(repo, runtime_storage_inst,
record_processor_inst, rcs_inst)
if 'drivers' in repo: if 'drivers' in repo:
LOG.info('Processing CI votes for repo: %s, branch: %s', _process_repo_ci_votes(repo, runtime_storage_inst,
uri, branch) record_processor_inst, rcs_inst)
rcs_key = 'ci:%s:%s' % (quoted_uri, branch)
last_retrieval_time = runtime_storage_inst.get_by_key(rcs_key)
current_retrieval_time = int(time.time())
review_iterator = rcs_inst.log(repo, branch, last_retrieval_time,
status='merged', grab_comments=True)
review_iterator = driverlog.log(review_iterator, repo['drivers'])
review_iterator_typed = _record_typer(review_iterator, 'ci')
processed_review_iterator = record_processor_inst.process(
review_iterator_typed)
runtime_storage_inst.set_records(processed_review_iterator,
utils.merge_records)
runtime_storage_inst.set_by_key(rcs_key, current_retrieval_time)
def _process_mail_list(uri, runtime_storage_inst, record_processor_inst): def _process_mail_list(uri, runtime_storage_inst, record_processor_inst):