Merge "Optimize memory consumption in disagreement processing"

This commit is contained in:
Jenkins
2014-04-29 12:31:02 +00:00
committed by Gerrit Code Review
2 changed files with 94 additions and 35 deletions

View File

@@ -14,6 +14,7 @@
# limitations under the License.
import bisect
import collections
import copy
import time
@@ -642,46 +643,61 @@ class RecordProcessor(object):
if user['core'] != core_old:
utils.store_user(self.runtime_storage_inst, user)
def _close_patch(self, cores, marks):
if len(marks) < 2:
return
core_mark = 0
for mark in sorted(marks, key=lambda x: x['date'], reverse=True):
if core_mark == 0:
if (mark['module'], mark['branch'], mark['user_id']) in cores:
# mark is from core engineer
core_mark = mark['value']
continue
disagreement = ((core_mark != 0) and
((core_mark < 0 < mark['value']) or
(core_mark > 0 > mark['value'])))
old_disagreement = mark.get('x')
mark['x'] = disagreement
if old_disagreement != disagreement:
yield mark
def _update_marks_with_disagreement(self):
LOG.debug('Process marks to find disagreements')
marks_per_patch = {}
cores = set()
for user in self.runtime_storage_inst.get_all_users():
for (module, branch) in (user['core'] or []):
cores.add((module, branch, user['user_id']))
# map from review_id to current patch and list of marks
marks_per_patch = collections.defaultdict(
lambda: {'patch_number': 0, 'marks': []})
for record in self.runtime_storage_inst.get_all_records():
if record['record_type'] == 'mark' and record['type'] == 'CRVW':
review_id = record['review_id']
patch_number = record['patch']
if (review_id, patch_number) in marks_per_patch:
marks_per_patch[(review_id, patch_number)].append(record)
else:
marks_per_patch[(review_id, patch_number)] = [record]
cores = dict([(user['user_id'], user)
for user in self.runtime_storage_inst.get_all_users()
if user['core']])
if review_id in marks_per_patch:
# review is already seen, check if patch is newer
if (marks_per_patch[review_id]['patch_number'] <
patch_number):
# the patch is new, close the current
for processed in self._close_patch(
cores, marks_per_patch[review_id]['marks']):
yield processed
del marks_per_patch[review_id]
for key, marks in six.iteritems(marks_per_patch):
if len(marks) < 2:
continue
marks_per_patch[review_id]['patch_number'] = patch_number
marks_per_patch[review_id]['marks'].append(record)
core_mark = 0
for mark in sorted(marks, key=lambda x: x['date'], reverse=True):
if core_mark == 0:
user_id = mark['user_id']
if user_id in cores:
user = cores[user_id]
if (mark['module'], mark['branch']) in user['core']:
# mark is from core engineer
core_mark = mark['value']
continue
disagreement = (core_mark != 0) and (
(core_mark < 0 < mark['value']) or
(core_mark > 0 > mark['value']))
old_disagreement = mark.get('x')
mark['x'] = disagreement
if old_disagreement != disagreement:
yield mark
# purge the rest
for marks_patch in marks_per_patch.values():
for processed in self._close_patch(cores, marks_patch['marks']):
yield processed
def update(self, release_index=None):
self.runtime_storage_inst.set_records(