diff --git a/stackalytics/processor/bps.py b/stackalytics/processor/bps.py index 667346bb2..f98dcc9c2 100644 --- a/stackalytics/processor/bps.py +++ b/stackalytics/processor/bps.py @@ -29,7 +29,7 @@ def _get_bug_id(web_link): return web_link[web_link.rfind('/') + 1:] -def log(repo, last_bug_date): +def log(repo, modified_since): module = repo['module'] LOG.debug('Retrieving list of bugs for module: %s', module) @@ -38,7 +38,7 @@ def log(repo, last_bug_date): return for record_draft in launchpad_utils.lp_bug_generator(module, - last_bug_date): + modified_since): record = {} diff --git a/stackalytics/processor/default_data_processor.py b/stackalytics/processor/default_data_processor.py index 4a56fce3b..5d629abd8 100644 --- a/stackalytics/processor/default_data_processor.py +++ b/stackalytics/processor/default_data_processor.py @@ -22,9 +22,7 @@ import six from stackalytics.openstack.common import log as logging from stackalytics.processor import normalizer -from stackalytics.processor import record_processor from stackalytics.processor import utils -from stackalytics.processor import vcs LOG = logging.getLogger(__name__) @@ -162,67 +160,10 @@ def _store_default_data(runtime_storage_inst, default_data): runtime_storage_inst.set_by_key(key, value) -def _update_records(runtime_storage_inst, sources_root): - LOG.debug('Update existing records') - release_index = {} - for repo in utils.load_repos(runtime_storage_inst): - vcs_inst = vcs.get_vcs(repo, sources_root) - release_index.update(vcs_inst.fetch()) - - record_processor_inst = record_processor.RecordProcessor( - runtime_storage_inst) - record_processor_inst.update(release_index) - - -def _get_changed_member_records(runtime_storage_inst, record_processor_inst): - for record in runtime_storage_inst.get_all_records(): - if record['record_type'] == 'member' and 'company_name' in record: - company_draft = record['company_draft'] - company_name = record_processor_inst.domains_index.get( - utils.normalize_company_name(company_draft)) or ( - utils.normalize_company_draft(company_draft)) - - if company_name != record['company_name']: - record['company_name'] = company_name - yield record - - -def _update_members_company_name(runtime_storage_inst): - LOG.debug('Update company names for members') - record_processor_inst = record_processor.RecordProcessor( - runtime_storage_inst) - member_iterator = _get_changed_member_records(runtime_storage_inst, - record_processor_inst) - - for record in member_iterator: - company_name = record['company_name'] - user = utils.load_user(runtime_storage_inst, record['user_id']) - - user['companies'] = [{ - 'company_name': company_name, - 'end_date': 0, - }] - user['company_name'] = company_name - - utils.store_user(runtime_storage_inst, user) - - LOG.debug('Company name changed for user %s', user) - - record_id = record['record_id'] - runtime_storage_inst.memcached.set( - runtime_storage_inst._get_record_name(record_id), record) - runtime_storage_inst._commit_update(record_id) - - -def process(runtime_storage_inst, default_data, sources_root, force_update): +def process(runtime_storage_inst, default_data): LOG.debug('Process default data') - dd_changed = _check_default_data_change(runtime_storage_inst, default_data) - if 'project_sources' in default_data: _update_project_list(default_data) - if dd_changed or force_update: - _store_default_data(runtime_storage_inst, default_data) - _update_records(runtime_storage_inst, sources_root) - _update_members_company_name(runtime_storage_inst) + _store_default_data(runtime_storage_inst, default_data) diff --git a/stackalytics/processor/launchpad_utils.py b/stackalytics/processor/launchpad_utils.py index d633412c2..7de4bb76e 100644 --- a/stackalytics/processor/launchpad_utils.py +++ b/stackalytics/processor/launchpad_utils.py @@ -76,12 +76,12 @@ def lp_blueprint_generator(module): uri = chunk.get('next_collection_link') -def lp_bug_generator(module, last_bug_date): +def lp_bug_generator(module, modified_since): uri = LP_URI_DEVEL % (module + '?ws.op=searchTasks') for status in BUG_STATUSES: uri += '&status=' + six.moves.urllib.parse.quote_plus(status) - if last_bug_date: - uri += '&modified_since=' + last_bug_date + if modified_since: + uri += '&modified_since=' + utils.timestamp_to_utc_date(modified_since) while uri: LOG.debug('Reading chunk from uri %s', uri) diff --git a/stackalytics/processor/main.py b/stackalytics/processor/main.py index f141c23fd..936431afb 100644 --- a/stackalytics/processor/main.py +++ b/stackalytics/processor/main.py @@ -19,7 +19,6 @@ from oslo.config import cfg import psutil import six from six.moves.urllib import parse -import time import yaml from stackalytics.openstack.common import log as logging @@ -78,11 +77,12 @@ def _record_typer(record_iterator, record_type): yield record -def process_repo(repo, runtime_storage_inst, record_processor_inst, - last_bug_date): +def _process_repo(repo, runtime_storage_inst, record_processor_inst, + bug_modified_since): uri = repo['uri'] - LOG.debug('Processing repo uri %s' % uri) + LOG.info('Processing repo uri: %s', uri) + LOG.debug('Processing blueprints for repo uri: %s', uri) bp_iterator = lp.log(repo) bp_iterator_typed = _record_typer(bp_iterator, 'bp') processed_bp_iterator = record_processor_inst.process( @@ -90,7 +90,8 @@ def process_repo(repo, runtime_storage_inst, record_processor_inst, runtime_storage_inst.set_records(processed_bp_iterator, utils.merge_records) - bug_iterator = bps.log(repo, last_bug_date) + LOG.debug('Processing bugs for repo uri: %s', uri) + bug_iterator = bps.log(repo, bug_modified_since) bug_iterator_typed = _record_typer(bug_iterator, 'bug') processed_bug_iterator = record_processor_inst.process( bug_iterator_typed) @@ -110,7 +111,7 @@ def process_repo(repo, runtime_storage_inst, record_processor_inst, branches.add(release['branch']) for branch in branches: - LOG.debug('Processing repo %s, branch %s', uri, branch) + LOG.debug('Processing commits in repo: %s, branch: %s', uri, branch) vcs_key = 'vcs:' + str(parse.quote_plus(uri) + ':' + branch) last_id = runtime_storage_inst.get_by_key(vcs_key) @@ -125,7 +126,7 @@ def process_repo(repo, runtime_storage_inst, record_processor_inst, last_id = vcs_inst.get_last_id(branch) runtime_storage_inst.set_by_key(vcs_key, last_id) - LOG.debug('Processing reviews for repo %s, branch %s', uri, branch) + LOG.debug('Processing reviews for repo: %s, branch: %s', uri, branch) rcs_key = 'rcs:' + str(parse.quote_plus(uri) + ':' + branch) last_id = runtime_storage_inst.get_by_key(rcs_key) @@ -141,7 +142,7 @@ def process_repo(repo, runtime_storage_inst, record_processor_inst, runtime_storage_inst.set_by_key(rcs_key, last_id) -def process_mail_list(uri, runtime_storage_inst, record_processor_inst): +def _process_mail_list(uri, runtime_storage_inst, record_processor_inst): mail_iterator = mls.log(uri, runtime_storage_inst) mail_iterator_typed = _record_typer(mail_iterator, 'email') processed_mail_iterator = record_processor_inst.process( @@ -149,7 +150,7 @@ def process_mail_list(uri, runtime_storage_inst, record_processor_inst): runtime_storage_inst.set_records(processed_mail_iterator) -def process_member_list(uri, runtime_storage_inst, record_processor_inst): +def _process_member_list(uri, runtime_storage_inst, record_processor_inst): member_iterator = mps.log(uri, runtime_storage_inst, cfg.CONF.days_to_update_members) member_iterator_typed = _record_typer(member_iterator, 'member') @@ -161,26 +162,38 @@ def process_member_list(uri, runtime_storage_inst, record_processor_inst): def update_members(runtime_storage_inst, record_processor_inst): member_lists = runtime_storage_inst.get_by_key('member_lists') or [] for member_list in member_lists: - process_member_list(member_list, runtime_storage_inst, - record_processor_inst) + _process_member_list(member_list, runtime_storage_inst, + record_processor_inst) -def update_records(runtime_storage_inst, record_processor_inst): +def _post_process_records(record_processor_inst, repos): + LOG.debug('Build release index') + release_index = {} + for repo in repos: + vcs_inst = vcs.get_vcs(repo, cfg.CONF.sources_root) + release_index.update(vcs_inst.fetch()) + + LOG.debug('Post-process all records') + record_processor_inst.post_processing(release_index) + + +def process(runtime_storage_inst, record_processor_inst): repos = utils.load_repos(runtime_storage_inst) - current_date = utils.timestamp_to_utc_date(int(time.time())) - last_bug_date = runtime_storage_inst.get_by_key('last_bug_date') + current_date = utils.date_to_timestamp('now') + bug_modified_since = runtime_storage_inst.get_by_key('bug_modified_since') for repo in repos: - process_repo(repo, runtime_storage_inst, record_processor_inst, - last_bug_date) - runtime_storage_inst.set_by_key('last_bug_date', current_date) + _process_repo(repo, runtime_storage_inst, record_processor_inst, + bug_modified_since) + runtime_storage_inst.set_by_key('bug_modified_since', current_date) + LOG.info('Processing mail lists') mail_lists = runtime_storage_inst.get_by_key('mail_lists') or [] for mail_list in mail_lists: - process_mail_list(mail_list, runtime_storage_inst, - record_processor_inst) + _process_mail_list(mail_list, runtime_storage_inst, + record_processor_inst) - record_processor_inst.update() + _post_process_records(record_processor_inst, repos) def apply_corrections(uri, runtime_storage_inst): @@ -290,9 +303,7 @@ def main(): LOG.critical('Unable to load default data') return not 0 default_data_processor.process(runtime_storage_inst, - default_data, - cfg.CONF.sources_root, - cfg.CONF.force_update) + default_data) process_program_list(runtime_storage_inst, cfg.CONF.program_list_uri) @@ -301,7 +312,7 @@ def main(): record_processor_inst = record_processor.RecordProcessor( runtime_storage_inst) - update_records(runtime_storage_inst, record_processor_inst) + process(runtime_storage_inst, record_processor_inst) apply_corrections(cfg.CONF.corrections_uri, runtime_storage_inst) diff --git a/stackalytics/processor/record_processor.py b/stackalytics/processor/record_processor.py index 23a0cbdad..3562991ce 100644 --- a/stackalytics/processor/record_processor.py +++ b/stackalytics/processor/record_processor.py @@ -542,6 +542,8 @@ class RecordProcessor(object): yield record def _update_commits_with_merge_date(self): + LOG.debug('Update commits with merge date') + change_id_to_date = {} for record in self.runtime_storage_inst.get_all_records(): if (record['record_type'] == 'review' and @@ -728,13 +730,46 @@ class RecordProcessor(object): for processed in self._close_patch(cores, marks_patch['marks']): yield processed - def update(self, release_index=None): + def _update_members_company_name(self): + LOG.debug('Update members with company names') + + for record in self.runtime_storage_inst.get_all_records(): + if record['record_type'] != 'member': + continue + + company_draft = record['company_draft'] + company_name = self.domains_index.get( + utils.normalize_company_name(company_draft)) or ( + utils.normalize_company_draft(company_draft)) + + if company_name == record['company_name']: + continue + + LOG.debug('Update record %s, company name changed to %s', + record, company_name) + record['company_name'] = company_name + + yield record + + user = utils.load_user(self.runtime_storage_inst, + record['user_id']) + LOG.debug('Update user %s, company name changed to %s', + user, company_name) + user['companies'] = [{ + 'company_name': company_name, + 'end_date': 0, + }] + utils.store_user(self.runtime_storage_inst, user) + + def post_processing(self, release_index): self.runtime_storage_inst.set_records( self._update_records_with_user_info()) - if release_index: - self.runtime_storage_inst.set_records( - self._update_records_with_releases(release_index)) + self.runtime_storage_inst.set_records( + self._update_commits_with_merge_date()) + + self.runtime_storage_inst.set_records( + self._update_records_with_releases(release_index)) self.runtime_storage_inst.set_records( self._update_reviews_with_sequence_number()) @@ -742,11 +777,11 @@ class RecordProcessor(object): self.runtime_storage_inst.set_records( self._update_blueprints_with_mention_info()) - self.runtime_storage_inst.set_records( - self._update_commits_with_merge_date()) - self._determine_core_contributors() # disagreement calculation must go after determining core contributors self.runtime_storage_inst.set_records( self._update_marks_with_disagreement()) + + self.runtime_storage_inst.set_records( + self._update_members_company_name()) diff --git a/tests/unit/test_record_processor.py b/tests/unit/test_record_processor.py index bcdc98fb2..eda41798b 100644 --- a/tests/unit/test_record_processor.py +++ b/tests/unit/test_record_processor.py @@ -782,7 +782,7 @@ class TestRecordProcessor(testtools.TestCase): 'module': 'nova', 'branch': 'master'} ])) - record_processor_inst.update() + record_processor_inst.post_processing({}) user = {'seq': 2, 'core': [], @@ -853,7 +853,7 @@ class TestRecordProcessor(testtools.TestCase): }]} ])) - record_processor_inst.update() + record_processor_inst.post_processing({}) user_1 = {'seq': 1, 'user_id': 'john_doe', 'launchpad_id': 'john_doe', 'user_name': 'John Doe', @@ -942,7 +942,7 @@ class TestRecordProcessor(testtools.TestCase): 'date': 1234567895, 'blueprint_id': ['mod:blueprint', 'mod:invalid']}, ])) - record_processor_inst.update() + record_processor_inst.post_processing({}) bp1 = runtime_storage_inst.get_by_primary_key('bpd:mod:blueprint') self.assertEqual(2, bp1['mention_count']) @@ -978,7 +978,7 @@ class TestRecordProcessor(testtools.TestCase): 'createdOn': 5, 'module': 'glance', 'branch': 'master'}, ])) - record_processor_inst.update() + record_processor_inst.post_processing({}) review1 = runtime_storage_inst.get_by_primary_key('I111') self.assertEqual(2, review1['review_number']) @@ -1065,7 +1065,7 @@ class TestRecordProcessor(testtools.TestCase): } ]} ])) - record_processor_inst.update() + record_processor_inst.post_processing({}) marks = list([r for r in runtime_storage_inst.get_all_records() if r['record_type'] == 'mark']) @@ -1110,7 +1110,7 @@ class TestRecordProcessor(testtools.TestCase): 'status': 'MERGED', 'module': 'nova', 'branch': 'master'}, ])) - record_processor_inst.update() + record_processor_inst.post_processing({}) commit = runtime_storage_inst.get_by_primary_key('de7e8f2') self.assertEqual(1385490000, commit['date'])