Refactor repo processing code

Prepare the code for repo-based settings for code, reviews,
bug tracker, etc.

Change-Id: Iac62008d87d06f57fe07d43b3d4a89e1211e76e0
This commit is contained in:
Ilya Shakhat
2017-04-10 14:57:43 +04:00
parent 5d79f8aa9f
commit 3be5e1f12f

View File

@@ -19,7 +19,6 @@ from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
import psutil import psutil
import six import six
import time
from stackalytics.processor import bps from stackalytics.processor import bps
from stackalytics.processor import config from stackalytics.processor import config
@@ -77,46 +76,99 @@ def _record_typer(record_iterator, record_type):
yield record yield record
def _process_repo(repo, runtime_storage_inst, record_processor_inst, def _get_repo_branches(repo):
rcs_inst): return ({repo.get('default_branch', 'master')} |
uri = repo['uri'] set(r['branch'] for r in repo.get('releases', [])
quoted_uri = six.moves.urllib.parse.quote_plus(uri) if 'branch' in r))
LOG.info('Processing repo uri: %s', uri)
def _process_repo_blueprints(repo, runtime_storage_inst,
record_processor_inst):
LOG.info('Processing blueprints for repo: %s', repo['uri'])
LOG.info('Processing blueprints for repo uri: %s', uri)
bp_iterator = lp.log(repo) bp_iterator = lp.log(repo)
bp_iterator_typed = _record_typer(bp_iterator, 'bp') bp_iterator_typed = _record_typer(bp_iterator, 'bp')
processed_bp_iterator = record_processor_inst.process( processed_bp_iterator = record_processor_inst.process(bp_iterator_typed)
bp_iterator_typed)
runtime_storage_inst.set_records(processed_bp_iterator, runtime_storage_inst.set_records(processed_bp_iterator,
utils.merge_records) utils.merge_records)
LOG.info('Processing bugs for repo uri: %s', uri)
def _process_repo_bugs(repo, runtime_storage_inst, record_processor_inst):
LOG.info('Processing bugs for repo: %s', repo['uri'])
current_date = utils.date_to_timestamp('now') current_date = utils.date_to_timestamp('now')
bug_modified_since = runtime_storage_inst.get_by_key( bug_modified_since = runtime_storage_inst.get_by_key(
'bug_modified_since-%s' % repo['module']) 'bug_modified_since-%s' % repo['module'])
bug_iterator = bps.log(repo, bug_modified_since) bug_iterator = bps.log(repo, bug_modified_since)
bug_iterator_typed = _record_typer(bug_iterator, 'bug') bug_iterator_typed = _record_typer(bug_iterator, 'bug')
processed_bug_iterator = record_processor_inst.process( processed_bug_iterator = record_processor_inst.process(bug_iterator_typed)
bug_iterator_typed)
runtime_storage_inst.set_records(processed_bug_iterator, runtime_storage_inst.set_records(processed_bug_iterator,
utils.merge_records) utils.merge_records)
runtime_storage_inst.set_by_key('bug_modified_since-%s' % repo['module'],
current_date)
runtime_storage_inst.set_by_key(
'bug_modified_since-%s' % repo['module'], current_date)
def _process_repo_reviews(repo, runtime_storage_inst, record_processor_inst,
rcs_inst):
for branch in _get_repo_branches(repo):
LOG.info('Processing reviews for repo: %s, branch: %s',
repo['uri'], branch)
quoted_uri = six.moves.urllib.parse.quote_plus(repo['uri'])
rcs_key = 'rcs:%s:%s' % (quoted_uri, branch)
last_retrieval_time = runtime_storage_inst.get_by_key(rcs_key)
current_retrieval_time = utils.date_to_timestamp('now')
review_iterator = itertools.chain(
rcs_inst.log(repo, branch, last_retrieval_time, status='open'),
rcs_inst.log(repo, branch, last_retrieval_time, status='merged'),
rcs_inst.log(repo, branch, last_retrieval_time, status='abandoned',
grab_comments=True), )
review_iterator_typed = _record_typer(review_iterator, 'review')
processed_review_iterator = record_processor_inst.process(
review_iterator_typed)
runtime_storage_inst.set_records(processed_review_iterator,
utils.merge_records)
runtime_storage_inst.set_by_key(rcs_key, current_retrieval_time)
def _process_repo_ci_votes(repo, runtime_storage_inst, record_processor_inst,
rcs_inst):
for branch in _get_repo_branches(repo):
LOG.info('Processing CI votes for repo: %s, branch: %s',
repo['uri'], branch)
quoted_uri = six.moves.urllib.parse.quote_plus(repo['uri'])
rcs_key = 'ci:%s:%s' % (quoted_uri, branch)
last_retrieval_time = runtime_storage_inst.get_by_key(rcs_key)
current_retrieval_time = utils.date_to_timestamp('now')
review_iterator = rcs_inst.log(repo, branch, last_retrieval_time,
status='merged', grab_comments=True)
review_iterator = driverlog.log(review_iterator, repo['drivers'])
review_iterator_typed = _record_typer(review_iterator, 'ci')
processed_review_iterator = record_processor_inst.process(
review_iterator_typed)
runtime_storage_inst.set_records(processed_review_iterator,
utils.merge_records)
runtime_storage_inst.set_by_key(rcs_key, current_retrieval_time)
def _process_repo_vcs(repo, runtime_storage_inst, record_processor_inst):
vcs_inst = vcs.get_vcs(repo, CONF.sources_root) vcs_inst = vcs.get_vcs(repo, CONF.sources_root)
vcs_inst.fetch() vcs_inst.fetch()
branches = {repo.get('default_branch', 'master')} for branch in _get_repo_branches(repo):
for release in repo.get('releases', []): LOG.info('Processing commits in repo: %s, branch: %s',
if 'branch' in release: repo['uri'], branch)
branches.add(release['branch'])
for branch in branches:
LOG.info('Processing commits in repo: %s, branch: %s', uri, branch)
quoted_uri = six.moves.urllib.parse.quote_plus(repo['uri'])
vcs_key = 'vcs:%s:%s' % (quoted_uri, branch) vcs_key = 'vcs:%s:%s' % (quoted_uri, branch)
last_id = runtime_storage_inst.get_by_key(vcs_key) last_id = runtime_storage_inst.get_by_key(vcs_key)
@@ -130,49 +182,24 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst,
last_id = vcs_inst.get_last_id(branch) last_id = vcs_inst.get_last_id(branch)
runtime_storage_inst.set_by_key(vcs_key, last_id) runtime_storage_inst.set_by_key(vcs_key, last_id)
if 'has_gerrit' not in repo:
continue # do not poll reviews for those that do not have them
LOG.info('Processing reviews for repo: %s, branch: %s', uri, branch) def _process_repo(repo, runtime_storage_inst, record_processor_inst,
rcs_inst):
LOG.info('Processing repo: %s', repo['uri'])
rcs_key = 'rcs:%s:%s' % (quoted_uri, branch) _process_repo_vcs(repo, runtime_storage_inst, record_processor_inst)
last_retrieval_time = runtime_storage_inst.get_by_key(rcs_key)
current_retrieval_time = int(time.time())
review_iterator = itertools.chain( _process_repo_bugs(repo, runtime_storage_inst, record_processor_inst)
rcs_inst.log(repo, branch, last_retrieval_time, status='open'),
rcs_inst.log(repo, branch, last_retrieval_time, status='merged'),
rcs_inst.log(repo, branch, last_retrieval_time, status='abandoned',
grab_comments=True),
)
review_iterator_typed = _record_typer(review_iterator, 'review')
processed_review_iterator = record_processor_inst.process( _process_repo_blueprints(repo, runtime_storage_inst, record_processor_inst)
review_iterator_typed)
runtime_storage_inst.set_records(processed_review_iterator,
utils.merge_records)
runtime_storage_inst.set_by_key(rcs_key, current_retrieval_time) if 'has_gerrit' in repo:
_process_repo_reviews(repo, runtime_storage_inst,
record_processor_inst, rcs_inst)
if 'drivers' in repo: if 'drivers' in repo:
LOG.info('Processing CI votes for repo: %s, branch: %s', _process_repo_ci_votes(repo, runtime_storage_inst,
uri, branch) record_processor_inst, rcs_inst)
rcs_key = 'ci:%s:%s' % (quoted_uri, branch)
last_retrieval_time = runtime_storage_inst.get_by_key(rcs_key)
current_retrieval_time = int(time.time())
review_iterator = rcs_inst.log(repo, branch, last_retrieval_time,
status='merged', grab_comments=True)
review_iterator = driverlog.log(review_iterator, repo['drivers'])
review_iterator_typed = _record_typer(review_iterator, 'ci')
processed_review_iterator = record_processor_inst.process(
review_iterator_typed)
runtime_storage_inst.set_records(processed_review_iterator,
utils.merge_records)
runtime_storage_inst.set_by_key(rcs_key, current_retrieval_time)
def _process_mail_list(uri, runtime_storage_inst, record_processor_inst): def _process_mail_list(uri, runtime_storage_inst, record_processor_inst):