Batch sync change by commit tasks

We can end up with a large number of sequential sync change by
commit tasks which result in a series of simple gerrit queries
each with their own HTTP request.  Instead, combine them into
batches of 100 commits that are queried in one request.

Change-Id: I2437019c47758134d85399b8183a9c6d70fd45b1
This commit is contained in:
James E. Blair 2015-04-11 20:44:14 -04:00
parent 5796b706bf
commit 110b7c37d4
1 changed files with 41 additions and 9 deletions

View File

@ -85,6 +85,17 @@ class MultiQueue(object):
finally:
self.condition.release()
def find(self, klass, priority):
results = []
self.condition.acquire()
try:
for item in self.queues[priority]:
if isinstance(item, klass):
results.append(item)
finally:
self.condition.release()
return results
class UpdateEvent(object):
def updateRelatedChanges(self, session, change):
@ -440,27 +451,36 @@ class SetSyncQueryUpdatedTask(Task):
sync_query = session.getSyncQueryByName(self.query_name)
sync_query.updated = self.updated
class SyncChangeByCommitTask(Task):
def __init__(self, commit, priority=NORMAL_PRIORITY):
super(SyncChangeByCommitTask, self).__init__(priority)
self.commit = commit
class SyncChangesByCommitsTask(Task):
def __init__(self, commits, priority=NORMAL_PRIORITY):
super(SyncChangesByCommitsTask, self).__init__(priority)
self.commits = commits
def __repr__(self):
return '<SyncChangeByCommitTask %s>' % (self.commit,)
return '<SyncChangesByCommitsTask %s>' % (self.commits,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.commit == self.commit):
other.commits == self.commits):
return True
return False
def run(self, sync):
query = 'commit:%s' % self.commit
query = ' OR '.join(['commit:%s' % x for x in self.commits])
changes = sync.get('changes/?q=%s' % query)
self.log.debug('Query: %s ' % (query,))
for c in changes:
sync.submitTask(SyncChangeTask(c['id'], priority=self.priority))
self.log.debug("Sync change %s for its commit %s" % (c['id'], self.commit))
self.log.debug("Sync change %s for its commit" % (c['id'],))
def addCommit(self, commit):
if commit in self.commits:
return True
# 100 should be under the URL length limit
if len(self.commits) >= 100:
return False
self.commits.append(commit)
return True
class SyncChangeByNumberTask(Task):
def __init__(self, number, priority=NORMAL_PRIORITY):
@ -592,7 +612,7 @@ class SyncChangeTask(Task):
if revision.parent not in parent_commits:
parent_revision = session.getRevisionByCommit(revision.parent)
if not parent_revision and change.status not in CLOSED_STATUSES:
sync.submitTask(SyncChangeByCommitTask(revision.parent, self.priority))
sync._syncChangeByCommit(revision.parent, self.priority)
self.log.debug("Change %s revision %s needs parent commit %s synced" %
(change.id, remote_revision['_number'], revision.parent))
parent_commits.add(revision.parent)
@ -1264,3 +1284,15 @@ class Sync(object):
if task.wait():
for subtask in task.tasks:
subtask.wait()
def _syncChangeByCommit(self, commit, priority):
# Accumulate sync change by commit tasks because they often
# come in batches. This method assumes it is being called
# from within the run queue already and therefore does not
# need to worry about locking the queue.
task = None
for task in self.queue.find(SyncChangesByCommitsTask, priority):
if task.addCommit(commit):
return
task = SyncChangesByCommitsTask([commit], priority)
self.submitTask(task)