From 42aaa1d77ee4bbc216fadc08bd7f318efc998483 Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Mon, 19 Aug 2013 18:07:51 +0400 Subject: [PATCH] Poll open reviews by chunks instead of one by one Closes bug 1213358 Change-Id: Ibcf0e3a2d48e249af68e7d2d1ac27b6507510e47 --- stackalytics/processor/main.py | 28 ++-------------- stackalytics/processor/rcs.py | 58 +++++++++++++++++----------------- 2 files changed, 32 insertions(+), 54 deletions(-) diff --git a/stackalytics/processor/main.py b/stackalytics/processor/main.py index ce151bbb2..2f68f63df 100644 --- a/stackalytics/processor/main.py +++ b/stackalytics/processor/main.py @@ -73,7 +73,7 @@ def _record_typer(record_iterator, record_type): yield record -def process_repo(repo, runtime_storage, record_processor_inst, open_reviews): +def process_repo(repo, runtime_storage, record_processor_inst): uri = repo['uri'] LOG.debug('Processing repo uri %s' % uri) @@ -104,7 +104,7 @@ def process_repo(repo, runtime_storage, record_processor_inst, open_reviews): rcs_key = 'rcs:' + str(urllib.quote_plus(uri) + ':' + branch) last_id = runtime_storage.get_by_key(rcs_key) - review_iterator = rcs_inst.log(branch, last_id, open_reviews) + review_iterator = rcs_inst.log(branch, last_id) review_iterator_typed = _record_typer(review_iterator, 'review') processed_review_iterator = record_processor_inst.process( review_iterator_typed) @@ -114,35 +114,13 @@ def process_repo(repo, runtime_storage, record_processor_inst, open_reviews): runtime_storage.set_by_key(rcs_key, last_id) -def _open_reviews(runtime_storage_inst): - LOG.debug('Collecting list of open reviews from') - open_reviews = {} - for record in runtime_storage_inst.get_all_records(): - if record['record_type'] == 'review': - if record['open']: - module = record['module'] - if module not in open_reviews: - open_reviews[module] = set([record['sortKey']]) - else: - open_reviews[module].add(record['sortKey']) - return open_reviews - - def update_repos(runtime_storage_inst): repos = runtime_storage_inst.get_by_key('repos') record_processor_inst = record_processor.RecordProcessor( runtime_storage_inst) - open_reviews = _open_reviews(runtime_storage_inst) - for repo in repos: - module = repo['module'] - open_reviews_repo = set() - if module in open_reviews: - open_reviews_repo = open_reviews[module] - - process_repo(repo, runtime_storage_inst, record_processor_inst, - open_reviews_repo) + process_repo(repo, runtime_storage_inst, record_processor_inst) def apply_corrections(uri, runtime_storage_inst): diff --git a/stackalytics/processor/rcs.py b/stackalytics/processor/rcs.py index 3307f33e4..e7e6388cf 100644 --- a/stackalytics/processor/rcs.py +++ b/stackalytics/processor/rcs.py @@ -35,7 +35,7 @@ class Rcs(object): def setup(self, **kwargs): pass - def log(self, branch, last_id, open_reviews): + def log(self, branch, last_id): return [] def get_last_id(self, branch): @@ -76,31 +76,26 @@ class Gerrit(Rcs): LOG.debug('Successfully connected to Gerrit') def _get_cmd(self, project_organization, module, branch, sort_key, - limit=PAGE_LIMIT): + is_open): cmd = ('gerrit query --all-approvals --patch-sets --format JSON ' 'project:\'%(ogn)s/%(module)s\' branch:%(branch)s ' 'limit:%(limit)s' % {'ogn': project_organization, 'module': module, - 'branch': branch, 'limit': limit}) + 'branch': branch, 'limit': PAGE_LIMIT}) + if is_open: + cmd += ' is:open' if sort_key: cmd += ' resume_sortkey:%016x' % sort_key return cmd - def log(self, branch, last_id, open_reviews): - match = re.search(r'([^\/]+)/([^\/]+)\.git$', self.repo['uri']) - if not match: - LOG.error('Invalid repo uri: %s', self.repo['uri']) - project_organization = match.group(1) - module = match.group(2) - LOG.debug('Retrieve reviews from gerrit from organization %s ' - 'for project %s', project_organization, module) - - self._connect() - - sort_key = None + def _poll_reviews(self, project_organization, module, branch, + start_id=None, last_id=None, is_open=False): + sort_key = start_id while True: - cmd = self._get_cmd(project_organization, module, branch, sort_key) + cmd = self._get_cmd(project_organization, module, branch, sort_key, + is_open) + LOG.debug('Executing command: %s', cmd) stdin, stdout, stderr = self.client.exec_command(cmd) proceed = False @@ -120,21 +115,26 @@ class Gerrit(Rcs): if not proceed: break - # poll open reviews - LOG.debug('Retrieve open reviews from gerrit for project %s', module) + def log(self, branch, last_id): + match = re.search(r'([^\/]+)/([^\/]+)\.git$', self.repo['uri']) + if not match: + LOG.error('Invalid repo uri: %s', self.repo['uri']) + project_organization = match.group(1) + module = match.group(2) - for sort_key_str in open_reviews: - sort_key = int(sort_key_str, 16) - cmd = self._get_cmd(project_organization, module, branch, - sort_key + 1, limit=1) - LOG.debug('Retrieve review with sortKey %s', sort_key) - stdin, stdout, stderr = self.client.exec_command(cmd) + self._connect() - for line in stdout: - review = json.loads(line) - if 'sortKey' in review: - review['module'] = module - yield review + # poll new reviews from the top down to last_id + LOG.debug('Poll new reviews') + for review in self._poll_reviews(project_organization, module, branch, + last_id=last_id): + yield review + + # poll open reviews from last_id down to bottom + LOG.debug('Poll open reviews') + for review in self._poll_reviews(project_organization, module, branch, + start_id=last_id + 1, is_open=True): + yield review self.client.close()