Post-process all records every time the processor runs
Post-processing includes several operations on full set of records. For commits it updates the date with merge date and updates release tag, definetely these operations need to be run after every run of the processor, not only when default data changes. Also this patch includes refactoring of members update and minor renamings. Closes bug 1330483 Change-Id: If5968172fb9011dd5c66bf549206e805bea67f6d
This commit is contained in:
@@ -19,7 +19,6 @@ from oslo.config import cfg
|
||||
import psutil
|
||||
import six
|
||||
from six.moves.urllib import parse
|
||||
import time
|
||||
import yaml
|
||||
|
||||
from stackalytics.openstack.common import log as logging
|
||||
@@ -78,11 +77,12 @@ def _record_typer(record_iterator, record_type):
|
||||
yield record
|
||||
|
||||
|
||||
def process_repo(repo, runtime_storage_inst, record_processor_inst,
|
||||
last_bug_date):
|
||||
def _process_repo(repo, runtime_storage_inst, record_processor_inst,
|
||||
bug_modified_since):
|
||||
uri = repo['uri']
|
||||
LOG.debug('Processing repo uri %s' % uri)
|
||||
LOG.info('Processing repo uri: %s', uri)
|
||||
|
||||
LOG.debug('Processing blueprints for repo uri: %s', uri)
|
||||
bp_iterator = lp.log(repo)
|
||||
bp_iterator_typed = _record_typer(bp_iterator, 'bp')
|
||||
processed_bp_iterator = record_processor_inst.process(
|
||||
@@ -90,7 +90,8 @@ def process_repo(repo, runtime_storage_inst, record_processor_inst,
|
||||
runtime_storage_inst.set_records(processed_bp_iterator,
|
||||
utils.merge_records)
|
||||
|
||||
bug_iterator = bps.log(repo, last_bug_date)
|
||||
LOG.debug('Processing bugs for repo uri: %s', uri)
|
||||
bug_iterator = bps.log(repo, bug_modified_since)
|
||||
bug_iterator_typed = _record_typer(bug_iterator, 'bug')
|
||||
processed_bug_iterator = record_processor_inst.process(
|
||||
bug_iterator_typed)
|
||||
@@ -110,7 +111,7 @@ def process_repo(repo, runtime_storage_inst, record_processor_inst,
|
||||
branches.add(release['branch'])
|
||||
|
||||
for branch in branches:
|
||||
LOG.debug('Processing repo %s, branch %s', uri, branch)
|
||||
LOG.debug('Processing commits in repo: %s, branch: %s', uri, branch)
|
||||
|
||||
vcs_key = 'vcs:' + str(parse.quote_plus(uri) + ':' + branch)
|
||||
last_id = runtime_storage_inst.get_by_key(vcs_key)
|
||||
@@ -125,7 +126,7 @@ def process_repo(repo, runtime_storage_inst, record_processor_inst,
|
||||
last_id = vcs_inst.get_last_id(branch)
|
||||
runtime_storage_inst.set_by_key(vcs_key, last_id)
|
||||
|
||||
LOG.debug('Processing reviews for repo %s, branch %s', uri, branch)
|
||||
LOG.debug('Processing reviews for repo: %s, branch: %s', uri, branch)
|
||||
|
||||
rcs_key = 'rcs:' + str(parse.quote_plus(uri) + ':' + branch)
|
||||
last_id = runtime_storage_inst.get_by_key(rcs_key)
|
||||
@@ -141,7 +142,7 @@ def process_repo(repo, runtime_storage_inst, record_processor_inst,
|
||||
runtime_storage_inst.set_by_key(rcs_key, last_id)
|
||||
|
||||
|
||||
def process_mail_list(uri, runtime_storage_inst, record_processor_inst):
|
||||
def _process_mail_list(uri, runtime_storage_inst, record_processor_inst):
|
||||
mail_iterator = mls.log(uri, runtime_storage_inst)
|
||||
mail_iterator_typed = _record_typer(mail_iterator, 'email')
|
||||
processed_mail_iterator = record_processor_inst.process(
|
||||
@@ -149,7 +150,7 @@ def process_mail_list(uri, runtime_storage_inst, record_processor_inst):
|
||||
runtime_storage_inst.set_records(processed_mail_iterator)
|
||||
|
||||
|
||||
def process_member_list(uri, runtime_storage_inst, record_processor_inst):
|
||||
def _process_member_list(uri, runtime_storage_inst, record_processor_inst):
|
||||
member_iterator = mps.log(uri, runtime_storage_inst,
|
||||
cfg.CONF.days_to_update_members)
|
||||
member_iterator_typed = _record_typer(member_iterator, 'member')
|
||||
@@ -161,26 +162,38 @@ def process_member_list(uri, runtime_storage_inst, record_processor_inst):
|
||||
def update_members(runtime_storage_inst, record_processor_inst):
|
||||
member_lists = runtime_storage_inst.get_by_key('member_lists') or []
|
||||
for member_list in member_lists:
|
||||
process_member_list(member_list, runtime_storage_inst,
|
||||
record_processor_inst)
|
||||
_process_member_list(member_list, runtime_storage_inst,
|
||||
record_processor_inst)
|
||||
|
||||
|
||||
def update_records(runtime_storage_inst, record_processor_inst):
|
||||
def _post_process_records(record_processor_inst, repos):
|
||||
LOG.debug('Build release index')
|
||||
release_index = {}
|
||||
for repo in repos:
|
||||
vcs_inst = vcs.get_vcs(repo, cfg.CONF.sources_root)
|
||||
release_index.update(vcs_inst.fetch())
|
||||
|
||||
LOG.debug('Post-process all records')
|
||||
record_processor_inst.post_processing(release_index)
|
||||
|
||||
|
||||
def process(runtime_storage_inst, record_processor_inst):
|
||||
repos = utils.load_repos(runtime_storage_inst)
|
||||
|
||||
current_date = utils.timestamp_to_utc_date(int(time.time()))
|
||||
last_bug_date = runtime_storage_inst.get_by_key('last_bug_date')
|
||||
current_date = utils.date_to_timestamp('now')
|
||||
bug_modified_since = runtime_storage_inst.get_by_key('bug_modified_since')
|
||||
for repo in repos:
|
||||
process_repo(repo, runtime_storage_inst, record_processor_inst,
|
||||
last_bug_date)
|
||||
runtime_storage_inst.set_by_key('last_bug_date', current_date)
|
||||
_process_repo(repo, runtime_storage_inst, record_processor_inst,
|
||||
bug_modified_since)
|
||||
runtime_storage_inst.set_by_key('bug_modified_since', current_date)
|
||||
|
||||
LOG.info('Processing mail lists')
|
||||
mail_lists = runtime_storage_inst.get_by_key('mail_lists') or []
|
||||
for mail_list in mail_lists:
|
||||
process_mail_list(mail_list, runtime_storage_inst,
|
||||
record_processor_inst)
|
||||
_process_mail_list(mail_list, runtime_storage_inst,
|
||||
record_processor_inst)
|
||||
|
||||
record_processor_inst.update()
|
||||
_post_process_records(record_processor_inst, repos)
|
||||
|
||||
|
||||
def apply_corrections(uri, runtime_storage_inst):
|
||||
@@ -290,9 +303,7 @@ def main():
|
||||
LOG.critical('Unable to load default data')
|
||||
return not 0
|
||||
default_data_processor.process(runtime_storage_inst,
|
||||
default_data,
|
||||
cfg.CONF.sources_root,
|
||||
cfg.CONF.force_update)
|
||||
default_data)
|
||||
|
||||
process_program_list(runtime_storage_inst, cfg.CONF.program_list_uri)
|
||||
|
||||
@@ -301,7 +312,7 @@ def main():
|
||||
record_processor_inst = record_processor.RecordProcessor(
|
||||
runtime_storage_inst)
|
||||
|
||||
update_records(runtime_storage_inst, record_processor_inst)
|
||||
process(runtime_storage_inst, record_processor_inst)
|
||||
|
||||
apply_corrections(cfg.CONF.corrections_uri, runtime_storage_inst)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user