Poll open reviews
Closes bug 1209162 Change-Id: I449631515955fda140b2c62ff0e9eaf5d6884489
This commit is contained in:
@@ -73,7 +73,7 @@ def _record_typer(record_iterator, record_type):
|
|||||||
yield record
|
yield record
|
||||||
|
|
||||||
|
|
||||||
def process_repo(repo, runtime_storage, record_processor_inst):
|
def process_repo(repo, runtime_storage, record_processor_inst, open_reviews):
|
||||||
uri = repo['uri']
|
uri = repo['uri']
|
||||||
LOG.debug('Processing repo uri %s' % uri)
|
LOG.debug('Processing repo uri %s' % uri)
|
||||||
|
|
||||||
@@ -104,7 +104,7 @@ def process_repo(repo, runtime_storage, record_processor_inst):
|
|||||||
rcs_key = 'rcs:' + str(urllib.quote_plus(uri) + ':' + branch)
|
rcs_key = 'rcs:' + str(urllib.quote_plus(uri) + ':' + branch)
|
||||||
last_id = runtime_storage.get_by_key(rcs_key)
|
last_id = runtime_storage.get_by_key(rcs_key)
|
||||||
|
|
||||||
review_iterator = rcs_inst.log(branch, last_id)
|
review_iterator = rcs_inst.log(branch, last_id, open_reviews)
|
||||||
review_iterator_typed = _record_typer(review_iterator, 'review')
|
review_iterator_typed = _record_typer(review_iterator, 'review')
|
||||||
processed_review_iterator = record_processor_inst.process(
|
processed_review_iterator = record_processor_inst.process(
|
||||||
review_iterator_typed)
|
review_iterator_typed)
|
||||||
@@ -114,13 +114,35 @@ def process_repo(repo, runtime_storage, record_processor_inst):
|
|||||||
runtime_storage.set_by_key(rcs_key, last_id)
|
runtime_storage.set_by_key(rcs_key, last_id)
|
||||||
|
|
||||||
|
|
||||||
|
def _open_reviews(runtime_storage_inst):
|
||||||
|
LOG.debug('Collecting list of open reviews from')
|
||||||
|
open_reviews = {}
|
||||||
|
for record in runtime_storage_inst.get_all_records():
|
||||||
|
if record['record_type'] == 'review':
|
||||||
|
if record['open']:
|
||||||
|
module = record['module']
|
||||||
|
if module not in open_reviews:
|
||||||
|
open_reviews[module] = set([record['sortKey']])
|
||||||
|
else:
|
||||||
|
open_reviews[module].add(record['sortKey'])
|
||||||
|
return open_reviews
|
||||||
|
|
||||||
|
|
||||||
def update_repos(runtime_storage_inst):
|
def update_repos(runtime_storage_inst):
|
||||||
repos = runtime_storage_inst.get_by_key('repos')
|
repos = runtime_storage_inst.get_by_key('repos')
|
||||||
record_processor_inst = record_processor.RecordProcessor(
|
record_processor_inst = record_processor.RecordProcessor(
|
||||||
runtime_storage_inst)
|
runtime_storage_inst)
|
||||||
|
|
||||||
|
open_reviews = _open_reviews(runtime_storage_inst)
|
||||||
|
|
||||||
for repo in repos:
|
for repo in repos:
|
||||||
process_repo(repo, runtime_storage_inst, record_processor_inst)
|
module = repo['module']
|
||||||
|
open_reviews_repo = set()
|
||||||
|
if module in open_reviews:
|
||||||
|
open_reviews_repo = open_reviews[module]
|
||||||
|
|
||||||
|
process_repo(repo, runtime_storage_inst, record_processor_inst,
|
||||||
|
open_reviews_repo)
|
||||||
|
|
||||||
|
|
||||||
def apply_corrections(uri, runtime_storage_inst):
|
def apply_corrections(uri, runtime_storage_inst):
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ class Rcs(object):
|
|||||||
def setup(self, **kwargs):
|
def setup(self, **kwargs):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def log(self, branch, last_id):
|
def log(self, branch, last_id, open_reviews):
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def get_last_id(self, branch):
|
def get_last_id(self, branch):
|
||||||
@@ -75,15 +75,15 @@ class Gerrit(Rcs):
|
|||||||
username=self.username)
|
username=self.username)
|
||||||
LOG.debug('Successfully connected to Gerrit')
|
LOG.debug('Successfully connected to Gerrit')
|
||||||
|
|
||||||
def _get_cmd(self, module, branch, sort_key):
|
def _get_cmd(self, module, branch, sort_key, limit=PAGE_LIMIT):
|
||||||
cmd = ('gerrit query --all-approvals --patch-sets --format JSON '
|
cmd = ('gerrit query --all-approvals --patch-sets --format JSON '
|
||||||
'%(module)s branch:%(branch)s limit:%(limit)s' %
|
'%(module)s branch:%(branch)s limit:%(limit)s' %
|
||||||
{'module': module, 'branch': branch, 'limit': PAGE_LIMIT})
|
{'module': module, 'branch': branch, 'limit': limit})
|
||||||
if sort_key:
|
if sort_key:
|
||||||
cmd += ' resume_sortkey:%016x' % sort_key
|
cmd += ' resume_sortkey:%016x' % sort_key
|
||||||
return cmd
|
return cmd
|
||||||
|
|
||||||
def log(self, branch, last_id):
|
def log(self, branch, last_id, open_reviews):
|
||||||
module = self.repo['module']
|
module = self.repo['module']
|
||||||
LOG.debug('Retrieve reviews from gerrit for project %s', module)
|
LOG.debug('Retrieve reviews from gerrit for project %s', module)
|
||||||
|
|
||||||
@@ -112,6 +112,21 @@ class Gerrit(Rcs):
|
|||||||
if not proceed:
|
if not proceed:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
# poll open reviews
|
||||||
|
LOG.debug('Retrieve open reviews from gerrit for project %s', module)
|
||||||
|
|
||||||
|
for sort_key_str in open_reviews:
|
||||||
|
sort_key = int(sort_key_str, 16)
|
||||||
|
cmd = self._get_cmd(module, branch, sort_key + 1, limit=1)
|
||||||
|
LOG.debug('Retrieve review with sortKey %s', sort_key)
|
||||||
|
stdin, stdout, stderr = self.client.exec_command(cmd)
|
||||||
|
|
||||||
|
for line in stdout:
|
||||||
|
review = json.loads(line)
|
||||||
|
if 'sortKey' in review:
|
||||||
|
review['module'] = module
|
||||||
|
yield review
|
||||||
|
|
||||||
self.client.close()
|
self.client.close()
|
||||||
|
|
||||||
def get_last_id(self, branch):
|
def get_last_id(self, branch):
|
||||||
|
|||||||
@@ -75,13 +75,14 @@ class MemcachedStorage(RuntimeStorage):
|
|||||||
record_id = self.record_index[record['primary_key']]
|
record_id = self.record_index[record['primary_key']]
|
||||||
if not merge_handler:
|
if not merge_handler:
|
||||||
record['record_id'] = record_id
|
record['record_id'] = record_id
|
||||||
|
LOG.debug('Update record %s', record)
|
||||||
self.memcached.set(self._get_record_name(record_id),
|
self.memcached.set(self._get_record_name(record_id),
|
||||||
record)
|
record)
|
||||||
else:
|
else:
|
||||||
original = self.memcached.get(self._get_record_name(
|
original = self.memcached.get(self._get_record_name(
|
||||||
record_id))
|
record_id))
|
||||||
if merge_handler(original, record):
|
if merge_handler(original, record):
|
||||||
LOG.debug('Update record %s' % record)
|
LOG.debug('Update record with merge %s', record)
|
||||||
self.memcached.set(self._get_record_name(record_id),
|
self.memcached.set(self._get_record_name(record_id),
|
||||||
original)
|
original)
|
||||||
else:
|
else:
|
||||||
@@ -89,7 +90,7 @@ class MemcachedStorage(RuntimeStorage):
|
|||||||
record_id = self._get_record_count()
|
record_id = self._get_record_count()
|
||||||
record['record_id'] = record_id
|
record['record_id'] = record_id
|
||||||
self.record_index[record['primary_key']] = record_id
|
self.record_index[record['primary_key']] = record_id
|
||||||
LOG.debug('Insert new record %s' % record)
|
LOG.debug('Insert new record %s', record)
|
||||||
self.memcached.set(self._get_record_name(record_id), record)
|
self.memcached.set(self._get_record_name(record_id), record)
|
||||||
self._set_record_count(record_id + 1)
|
self._set_record_count(record_id + 1)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user