From 368635c6f7353078e32bf52144af7a901fa68149 Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Mon, 16 Jun 2014 22:00:48 +0400 Subject: [PATCH] Post-process all records every time the processor runs Post-processing includes several operations on full set of records. For commits it updates the date with merge date and updates release tag, definetely these operations need to be run after every run of the processor, not only when default data changes. Also this patch includes refactoring of members update and minor renamings. Closes bug 1330483 Change-Id: If5968172fb9011dd5c66bf549206e805bea67f6d --- stackalytics/processor/bps.py | 4 +- .../processor/default_data_processor.py | 63 +------------------ stackalytics/processor/launchpad_utils.py | 6 +- stackalytics/processor/main.py | 59 ++++++++++------- stackalytics/processor/record_processor.py | 49 ++++++++++++--- tests/unit/test_record_processor.py | 12 ++-- 6 files changed, 90 insertions(+), 103 deletions(-) diff --git a/stackalytics/processor/bps.py b/stackalytics/processor/bps.py index 667346bb2..f98dcc9c2 100644 --- a/stackalytics/processor/bps.py +++ b/stackalytics/processor/bps.py @@ -29,7 +29,7 @@ def _get_bug_id(web_link): return web_link[web_link.rfind('/') + 1:] -def log(repo, last_bug_date): +def log(repo, modified_since): module = repo['module'] LOG.debug('Retrieving list of bugs for module: %s', module) @@ -38,7 +38,7 @@ def log(repo, last_bug_date): return for record_draft in launchpad_utils.lp_bug_generator(module, - last_bug_date): + modified_since): record = {} diff --git a/stackalytics/processor/default_data_processor.py b/stackalytics/processor/default_data_processor.py index 4a56fce3b..5d629abd8 100644 --- a/stackalytics/processor/default_data_processor.py +++ b/stackalytics/processor/default_data_processor.py @@ -22,9 +22,7 @@ import six from stackalytics.openstack.common import log as logging from stackalytics.processor import normalizer -from stackalytics.processor import record_processor from stackalytics.processor import utils -from stackalytics.processor import vcs LOG = logging.getLogger(__name__) @@ -162,67 +160,10 @@ def _store_default_data(runtime_storage_inst, default_data): runtime_storage_inst.set_by_key(key, value) -def _update_records(runtime_storage_inst, sources_root): - LOG.debug('Update existing records') - release_index = {} - for repo in utils.load_repos(runtime_storage_inst): - vcs_inst = vcs.get_vcs(repo, sources_root) - release_index.update(vcs_inst.fetch()) - - record_processor_inst = record_processor.RecordProcessor( - runtime_storage_inst) - record_processor_inst.update(release_index) - - -def _get_changed_member_records(runtime_storage_inst, record_processor_inst): - for record in runtime_storage_inst.get_all_records(): - if record['record_type'] == 'member' and 'company_name' in record: - company_draft = record['company_draft'] - company_name = record_processor_inst.domains_index.get( - utils.normalize_company_name(company_draft)) or ( - utils.normalize_company_draft(company_draft)) - - if company_name != record['company_name']: - record['company_name'] = company_name - yield record - - -def _update_members_company_name(runtime_storage_inst): - LOG.debug('Update company names for members') - record_processor_inst = record_processor.RecordProcessor( - runtime_storage_inst) - member_iterator = _get_changed_member_records(runtime_storage_inst, - record_processor_inst) - - for record in member_iterator: - company_name = record['company_name'] - user = utils.load_user(runtime_storage_inst, record['user_id']) - - user['companies'] = [{ - 'company_name': company_name, - 'end_date': 0, - }] - user['company_name'] = company_name - - utils.store_user(runtime_storage_inst, user) - - LOG.debug('Company name changed for user %s', user) - - record_id = record['record_id'] - runtime_storage_inst.memcached.set( - runtime_storage_inst._get_record_name(record_id), record) - runtime_storage_inst._commit_update(record_id) - - -def process(runtime_storage_inst, default_data, sources_root, force_update): +def process(runtime_storage_inst, default_data): LOG.debug('Process default data') - dd_changed = _check_default_data_change(runtime_storage_inst, default_data) - if 'project_sources' in default_data: _update_project_list(default_data) - if dd_changed or force_update: - _store_default_data(runtime_storage_inst, default_data) - _update_records(runtime_storage_inst, sources_root) - _update_members_company_name(runtime_storage_inst) + _store_default_data(runtime_storage_inst, default_data) diff --git a/stackalytics/processor/launchpad_utils.py b/stackalytics/processor/launchpad_utils.py index d633412c2..7de4bb76e 100644 --- a/stackalytics/processor/launchpad_utils.py +++ b/stackalytics/processor/launchpad_utils.py @@ -76,12 +76,12 @@ def lp_blueprint_generator(module): uri = chunk.get('next_collection_link') -def lp_bug_generator(module, last_bug_date): +def lp_bug_generator(module, modified_since): uri = LP_URI_DEVEL % (module + '?ws.op=searchTasks') for status in BUG_STATUSES: uri += '&status=' + six.moves.urllib.parse.quote_plus(status) - if last_bug_date: - uri += '&modified_since=' + last_bug_date + if modified_since: + uri += '&modified_since=' + utils.timestamp_to_utc_date(modified_since) while uri: LOG.debug('Reading chunk from uri %s', uri) diff --git a/stackalytics/processor/main.py b/stackalytics/processor/main.py index f141c23fd..936431afb 100644 --- a/stackalytics/processor/main.py +++ b/stackalytics/processor/main.py @@ -19,7 +19,6 @@ from oslo.config import cfg import psutil import six from six.moves.urllib import parse -import time import yaml from stackalytics.openstack.common import log as logging @@ -78,11 +77,12 @@ def _record_typer(record_iterator, record_type): yield record -def process_repo(repo, runtime_storage_inst, record_processor_inst, - last_bug_date): +def _process_repo(repo, runtime_storage_inst, record_processor_inst, + bug_modified_since): uri = repo['uri'] - LOG.debug('Processing repo uri %s' % uri) + LOG.info('Processing repo uri: %s', uri) + LOG.debug('Processing blueprints for repo uri: %s', uri) bp_iterator = lp.log(repo) bp_iterator_typed = _record_typer(bp_iterator, 'bp') processed_bp_iterator = record_processor_inst.process( @@ -90,7 +90,8 @@ def process_repo(repo, runtime_storage_inst, record_processor_inst, runtime_storage_inst.set_records(processed_bp_iterator, utils.merge_records) - bug_iterator = bps.log(repo, last_bug_date) + LOG.debug('Processing bugs for repo uri: %s', uri) + bug_iterator = bps.log(repo, bug_modified_since) bug_iterator_typed = _record_typer(bug_iterator, 'bug') processed_bug_iterator = record_processor_inst.process( bug_iterator_typed) @@ -110,7 +111,7 @@ def process_repo(repo, runtime_storage_inst, record_processor_inst, branches.add(release['branch']) for branch in branches: - LOG.debug('Processing repo %s, branch %s', uri, branch) + LOG.debug('Processing commits in repo: %s, branch: %s', uri, branch) vcs_key = 'vcs:' + str(parse.quote_plus(uri) + ':' + branch) last_id = runtime_storage_inst.get_by_key(vcs_key) @@ -125,7 +126,7 @@ def process_repo(repo, runtime_storage_inst, record_processor_inst, last_id = vcs_inst.get_last_id(branch) runtime_storage_inst.set_by_key(vcs_key, last_id) - LOG.debug('Processing reviews for repo %s, branch %s', uri, branch) + LOG.debug('Processing reviews for repo: %s, branch: %s', uri, branch) rcs_key = 'rcs:' + str(parse.quote_plus(uri) + ':' + branch) last_id = runtime_storage_inst.get_by_key(rcs_key) @@ -141,7 +142,7 @@ def process_repo(repo, runtime_storage_inst, record_processor_inst, runtime_storage_inst.set_by_key(rcs_key, last_id) -def process_mail_list(uri, runtime_storage_inst, record_processor_inst): +def _process_mail_list(uri, runtime_storage_inst, record_processor_inst): mail_iterator = mls.log(uri, runtime_storage_inst) mail_iterator_typed = _record_typer(mail_iterator, 'email') processed_mail_iterator = record_processor_inst.process( @@ -149,7 +150,7 @@ def process_mail_list(uri, runtime_storage_inst, record_processor_inst): runtime_storage_inst.set_records(processed_mail_iterator) -def process_member_list(uri, runtime_storage_inst, record_processor_inst): +def _process_member_list(uri, runtime_storage_inst, record_processor_inst): member_iterator = mps.log(uri, runtime_storage_inst, cfg.CONF.days_to_update_members) member_iterator_typed = _record_typer(member_iterator, 'member') @@ -161,26 +162,38 @@ def process_member_list(uri, runtime_storage_inst, record_processor_inst): def update_members(runtime_storage_inst, record_processor_inst): member_lists = runtime_storage_inst.get_by_key('member_lists') or [] for member_list in member_lists: - process_member_list(member_list, runtime_storage_inst, - record_processor_inst) + _process_member_list(member_list, runtime_storage_inst, + record_processor_inst) -def update_records(runtime_storage_inst, record_processor_inst): +def _post_process_records(record_processor_inst, repos): + LOG.debug('Build release index') + release_index = {} + for repo in repos: + vcs_inst = vcs.get_vcs(repo, cfg.CONF.sources_root) + release_index.update(vcs_inst.fetch()) + + LOG.debug('Post-process all records') + record_processor_inst.post_processing(release_index) + + +def process(runtime_storage_inst, record_processor_inst): repos = utils.load_repos(runtime_storage_inst) - current_date = utils.timestamp_to_utc_date(int(time.time())) - last_bug_date = runtime_storage_inst.get_by_key('last_bug_date') + current_date = utils.date_to_timestamp('now') + bug_modified_since = runtime_storage_inst.get_by_key('bug_modified_since') for repo in repos: - process_repo(repo, runtime_storage_inst, record_processor_inst, - last_bug_date) - runtime_storage_inst.set_by_key('last_bug_date', current_date) + _process_repo(repo, runtime_storage_inst, record_processor_inst, + bug_modified_since) + runtime_storage_inst.set_by_key('bug_modified_since', current_date) + LOG.info('Processing mail lists') mail_lists = runtime_storage_inst.get_by_key('mail_lists') or [] for mail_list in mail_lists: - process_mail_list(mail_list, runtime_storage_inst, - record_processor_inst) + _process_mail_list(mail_list, runtime_storage_inst, + record_processor_inst) - record_processor_inst.update() + _post_process_records(record_processor_inst, repos) def apply_corrections(uri, runtime_storage_inst): @@ -290,9 +303,7 @@ def main(): LOG.critical('Unable to load default data') return not 0 default_data_processor.process(runtime_storage_inst, - default_data, - cfg.CONF.sources_root, - cfg.CONF.force_update) + default_data) process_program_list(runtime_storage_inst, cfg.CONF.program_list_uri) @@ -301,7 +312,7 @@ def main(): record_processor_inst = record_processor.RecordProcessor( runtime_storage_inst) - update_records(runtime_storage_inst, record_processor_inst) + process(runtime_storage_inst, record_processor_inst) apply_corrections(cfg.CONF.corrections_uri, runtime_storage_inst) diff --git a/stackalytics/processor/record_processor.py b/stackalytics/processor/record_processor.py index 23a0cbdad..3562991ce 100644 --- a/stackalytics/processor/record_processor.py +++ b/stackalytics/processor/record_processor.py @@ -542,6 +542,8 @@ class RecordProcessor(object): yield record def _update_commits_with_merge_date(self): + LOG.debug('Update commits with merge date') + change_id_to_date = {} for record in self.runtime_storage_inst.get_all_records(): if (record['record_type'] == 'review' and @@ -728,13 +730,46 @@ class RecordProcessor(object): for processed in self._close_patch(cores, marks_patch['marks']): yield processed - def update(self, release_index=None): + def _update_members_company_name(self): + LOG.debug('Update members with company names') + + for record in self.runtime_storage_inst.get_all_records(): + if record['record_type'] != 'member': + continue + + company_draft = record['company_draft'] + company_name = self.domains_index.get( + utils.normalize_company_name(company_draft)) or ( + utils.normalize_company_draft(company_draft)) + + if company_name == record['company_name']: + continue + + LOG.debug('Update record %s, company name changed to %s', + record, company_name) + record['company_name'] = company_name + + yield record + + user = utils.load_user(self.runtime_storage_inst, + record['user_id']) + LOG.debug('Update user %s, company name changed to %s', + user, company_name) + user['companies'] = [{ + 'company_name': company_name, + 'end_date': 0, + }] + utils.store_user(self.runtime_storage_inst, user) + + def post_processing(self, release_index): self.runtime_storage_inst.set_records( self._update_records_with_user_info()) - if release_index: - self.runtime_storage_inst.set_records( - self._update_records_with_releases(release_index)) + self.runtime_storage_inst.set_records( + self._update_commits_with_merge_date()) + + self.runtime_storage_inst.set_records( + self._update_records_with_releases(release_index)) self.runtime_storage_inst.set_records( self._update_reviews_with_sequence_number()) @@ -742,11 +777,11 @@ class RecordProcessor(object): self.runtime_storage_inst.set_records( self._update_blueprints_with_mention_info()) - self.runtime_storage_inst.set_records( - self._update_commits_with_merge_date()) - self._determine_core_contributors() # disagreement calculation must go after determining core contributors self.runtime_storage_inst.set_records( self._update_marks_with_disagreement()) + + self.runtime_storage_inst.set_records( + self._update_members_company_name()) diff --git a/tests/unit/test_record_processor.py b/tests/unit/test_record_processor.py index bcdc98fb2..eda41798b 100644 --- a/tests/unit/test_record_processor.py +++ b/tests/unit/test_record_processor.py @@ -782,7 +782,7 @@ class TestRecordProcessor(testtools.TestCase): 'module': 'nova', 'branch': 'master'} ])) - record_processor_inst.update() + record_processor_inst.post_processing({}) user = {'seq': 2, 'core': [], @@ -853,7 +853,7 @@ class TestRecordProcessor(testtools.TestCase): }]} ])) - record_processor_inst.update() + record_processor_inst.post_processing({}) user_1 = {'seq': 1, 'user_id': 'john_doe', 'launchpad_id': 'john_doe', 'user_name': 'John Doe', @@ -942,7 +942,7 @@ class TestRecordProcessor(testtools.TestCase): 'date': 1234567895, 'blueprint_id': ['mod:blueprint', 'mod:invalid']}, ])) - record_processor_inst.update() + record_processor_inst.post_processing({}) bp1 = runtime_storage_inst.get_by_primary_key('bpd:mod:blueprint') self.assertEqual(2, bp1['mention_count']) @@ -978,7 +978,7 @@ class TestRecordProcessor(testtools.TestCase): 'createdOn': 5, 'module': 'glance', 'branch': 'master'}, ])) - record_processor_inst.update() + record_processor_inst.post_processing({}) review1 = runtime_storage_inst.get_by_primary_key('I111') self.assertEqual(2, review1['review_number']) @@ -1065,7 +1065,7 @@ class TestRecordProcessor(testtools.TestCase): } ]} ])) - record_processor_inst.update() + record_processor_inst.post_processing({}) marks = list([r for r in runtime_storage_inst.get_all_records() if r['record_type'] == 'mark']) @@ -1110,7 +1110,7 @@ class TestRecordProcessor(testtools.TestCase): 'status': 'MERGED', 'module': 'nova', 'branch': 'master'}, ])) - record_processor_inst.update() + record_processor_inst.post_processing({}) commit = runtime_storage_inst.get_by_primary_key('de7e8f2') self.assertEqual(1385490000, commit['date'])