From 59e5c91e7c6aab8224419e16cfc71d77d7d1989f Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Wed, 7 Aug 2013 17:00:04 +0400 Subject: [PATCH] Poll open reviews Closes bug 1209162 Change-Id: I449631515955fda140b2c62ff0e9eaf5d6884489 --- stackalytics/processor/main.py | 28 ++++++++++++++++++++--- stackalytics/processor/rcs.py | 23 +++++++++++++++---- stackalytics/processor/runtime_storage.py | 5 ++-- 3 files changed, 47 insertions(+), 9 deletions(-) diff --git a/stackalytics/processor/main.py b/stackalytics/processor/main.py index 6e0a4880d..77dc1cbe7 100644 --- a/stackalytics/processor/main.py +++ b/stackalytics/processor/main.py @@ -73,7 +73,7 @@ def _record_typer(record_iterator, record_type): yield record -def process_repo(repo, runtime_storage, record_processor_inst): +def process_repo(repo, runtime_storage, record_processor_inst, open_reviews): uri = repo['uri'] LOG.debug('Processing repo uri %s' % uri) @@ -104,7 +104,7 @@ def process_repo(repo, runtime_storage, record_processor_inst): rcs_key = 'rcs:' + str(urllib.quote_plus(uri) + ':' + branch) last_id = runtime_storage.get_by_key(rcs_key) - review_iterator = rcs_inst.log(branch, last_id) + review_iterator = rcs_inst.log(branch, last_id, open_reviews) review_iterator_typed = _record_typer(review_iterator, 'review') processed_review_iterator = record_processor_inst.process( review_iterator_typed) @@ -114,13 +114,35 @@ def process_repo(repo, runtime_storage, record_processor_inst): runtime_storage.set_by_key(rcs_key, last_id) +def _open_reviews(runtime_storage_inst): + LOG.debug('Collecting list of open reviews from') + open_reviews = {} + for record in runtime_storage_inst.get_all_records(): + if record['record_type'] == 'review': + if record['open']: + module = record['module'] + if module not in open_reviews: + open_reviews[module] = set([record['sortKey']]) + else: + open_reviews[module].add(record['sortKey']) + return open_reviews + + def update_repos(runtime_storage_inst): repos = runtime_storage_inst.get_by_key('repos') record_processor_inst = record_processor.RecordProcessor( runtime_storage_inst) + open_reviews = _open_reviews(runtime_storage_inst) + for repo in repos: - process_repo(repo, runtime_storage_inst, record_processor_inst) + module = repo['module'] + open_reviews_repo = set() + if module in open_reviews: + open_reviews_repo = open_reviews[module] + + process_repo(repo, runtime_storage_inst, record_processor_inst, + open_reviews_repo) def apply_corrections(uri, runtime_storage_inst): diff --git a/stackalytics/processor/rcs.py b/stackalytics/processor/rcs.py index 47ed4e9d4..93c0ae240 100644 --- a/stackalytics/processor/rcs.py +++ b/stackalytics/processor/rcs.py @@ -35,7 +35,7 @@ class Rcs(object): def setup(self, **kwargs): pass - def log(self, branch, last_id): + def log(self, branch, last_id, open_reviews): return [] def get_last_id(self, branch): @@ -75,15 +75,15 @@ class Gerrit(Rcs): username=self.username) LOG.debug('Successfully connected to Gerrit') - def _get_cmd(self, module, branch, sort_key): + def _get_cmd(self, module, branch, sort_key, limit=PAGE_LIMIT): cmd = ('gerrit query --all-approvals --patch-sets --format JSON ' '%(module)s branch:%(branch)s limit:%(limit)s' % - {'module': module, 'branch': branch, 'limit': PAGE_LIMIT}) + {'module': module, 'branch': branch, 'limit': limit}) if sort_key: cmd += ' resume_sortkey:%016x' % sort_key return cmd - def log(self, branch, last_id): + def log(self, branch, last_id, open_reviews): module = self.repo['module'] LOG.debug('Retrieve reviews from gerrit for project %s', module) @@ -112,6 +112,21 @@ class Gerrit(Rcs): if not proceed: break + # poll open reviews + LOG.debug('Retrieve open reviews from gerrit for project %s', module) + + for sort_key_str in open_reviews: + sort_key = int(sort_key_str, 16) + cmd = self._get_cmd(module, branch, sort_key + 1, limit=1) + LOG.debug('Retrieve review with sortKey %s', sort_key) + stdin, stdout, stderr = self.client.exec_command(cmd) + + for line in stdout: + review = json.loads(line) + if 'sortKey' in review: + review['module'] = module + yield review + self.client.close() def get_last_id(self, branch): diff --git a/stackalytics/processor/runtime_storage.py b/stackalytics/processor/runtime_storage.py index 4c8474709..e7159599d 100644 --- a/stackalytics/processor/runtime_storage.py +++ b/stackalytics/processor/runtime_storage.py @@ -75,13 +75,14 @@ class MemcachedStorage(RuntimeStorage): record_id = self.record_index[record['primary_key']] if not merge_handler: record['record_id'] = record_id + LOG.debug('Update record %s', record) self.memcached.set(self._get_record_name(record_id), record) else: original = self.memcached.get(self._get_record_name( record_id)) if merge_handler(original, record): - LOG.debug('Update record %s' % record) + LOG.debug('Update record with merge %s', record) self.memcached.set(self._get_record_name(record_id), original) else: @@ -89,7 +90,7 @@ class MemcachedStorage(RuntimeStorage): record_id = self._get_record_count() record['record_id'] = record_id self.record_index[record['primary_key']] = record_id - LOG.debug('Insert new record %s' % record) + LOG.debug('Insert new record %s', record) self.memcached.set(self._get_record_name(record_id), record) self._set_record_count(record_id + 1)